Skip to content

Conversation

@mccheah
Copy link
Collaborator

@mccheah mccheah commented Nov 22, 2016

Includes the following initial feature set:

  • Cluster mode with only Scala/Java jobs
  • Spark-submit support
  • Dynamic allocation

Does not include, most notably:

  • Client mode support
  • Proper testing on both the unit and integration level; integration
    tests are flaky

Includes the following initial feature set:
- Cluster mode with only Scala/Java jobs
- Spark-submit support
- Dynamic allocation

Does not include, most notably:
- Client mode support
- Proper testing on both the unit and integration level; integration
tests are flaky
@mccheah
Copy link
Collaborator Author

mccheah commented Nov 22, 2016

@foxish this reflects the current work I have, but I think there's a commit on this fork's master that's making this not be a clean merge.

@foxish
Copy link
Owner

foxish commented Nov 22, 2016

@mccheah Thanks!
I'll revert the change on the master branch.

@foxish
Copy link
Owner

foxish commented Nov 22, 2016

@erikerlandson
Copy link
Collaborator

erikerlandson commented Nov 22, 2016

@mccheah thanks for posting your project; it's pleasing to see the community is broadly aligning on the approach!

Here are some initial thoughts.

  1. It looks like this implementation requires that a separate REST server (KubernetesDriverLauncherService) is operating inside the cluster. I have reservations about imposing a requirement for spinning up a server, as contrasted with the current prototype's ability to submit directly into a cluster. This also keeps the implementation lighter-weight.
  2. Did splitting into driver and executor image types allow a reduced image size? (currently I have it working with a single base spark image, that runs different commands from the pod config to get a driver or executor)
  3. I like that pod-watcher idiom - that could help simplify the logic of managing executor pods, particularly in the dynamic executor case.
  4. Nice that you've got secret passing, that is to-do on our prototype
  5. Also cool that you have shuffle service with a daemon-set mode. We had some questions regarding performance impacts of daemon-set vs running along side the executor in the same container (which is how I have it working now). Do you know of any experimental comparisons?
  6. I've been gravitating toward a CLI argument philosophy that passing new kube-related arguments can just be accomplished using --conf spark.conf.var=value. It works smoothly with any similar args residing in a configuration file, and also makes it possible to avoid some of the bespoke-argument-processing cruft (e.g I've started excising KubernetesDriverDescription in our current prototype). It also works smoothly in the configuration forwarding logic (e.g. here). I'm interested in what people think of that philosophy.
  7. Dynamic executors can be resized downwards as well as upwards in doRequestTotalExecutors
  8. I ended up supporting a service-account parameter, which I needed when running in non-admin mode, so that the driver had the authorization to create new executor pods. Have you run into that?

@erikerlandson
Copy link
Collaborator

@mccheah can you write up quick instructions for the build & packaging workflow? That is, what commands do you run to build the repo, docker images, etc.

@mccheah
Copy link
Collaborator Author

mccheah commented Nov 22, 2016

@erikerlandson thanks for looking over this. Responding to some of your comments,

1 - The separate REST server is started automatically when using the Client class, and spark-submit will automatically create an instance of this REST server in the driver pod. The indirection is necessary to get files (jars, etc.) from the user's local machine to the driver pod. I'm open to suggestions regarding how to upload local files to the driver.

2 - The idea was not so much to reduce the image size but more so to leave as much configuration as possible to the Docker file as opposed to the Pod configuration from the driver. For example, when I'm creating the pod spec, I don't want to have to specify where JAVA_HOME is and where SPARK_HOME is. Or to put it another way, I only want to give the pod spec parameters that are specific to the application (driver url, custom ports), and to leave any parameters that are common for all executors to be in the docker file (JAVA_HOME, SPARK_HOME). It's a semantic nuance and I'm open to changing this; I've tried both approaches and I like the separation of concerns here but this is definitely open to discussion.

5 - There are some nuances with the daemon set that I haven't been able to work out, particularly since the shuffle service and the executors need to share the same disk space which is why the code does some host path volume mounting. This isn't ideal since the user would have to provision the kubelet host directories beforehand, so I'm open to alternative solutions. As for performance I haven't had the chance to work on benchmarking yet. There is an interesting discussion to be had regarding having the shuffle service running in the same pod vs. a separate pod. For example, if the shuffle service were to be running in the same pod as the executor, there are some questions around lifecycle management when executors shut down, since we need the shuffle service process to continue running even if the executors shut down. We would need to shut down just the executor container in the pod and leave the shuffle service container running, but then the cluster will have dangling shuffle service pods consuming resources, and it's unclear when the applications can indicate that they can be shut down.

6 - Yes, unifying all the configurations to use SparkConf seems reasonable.

7 - As far as I'm aware, doRequestTotalExecutors shouldn't actually kill any executors, but just set the desired number on the cluster manager, and requesting additional if necessary. doKillExecutors() is always the method that will be invoked to do the actual destruction of the executors. I gather this mostly from the JavaDoc here.

8 - I haven't quite run into this yet, I haven't tested too many cases with authentication and authorization with this prototype. Almost all of my experimentation has been via the KubernetesSuite test.

Regarding build and run, here's what I've been doing:

  1. Set up dependencies: mvn pre-integration-test -pl kubernetes/integration-tests -am -DskipTests -Pkubernetes -Pkubernetes-integration-tests
  2. Run KubernetesSuite from IntelliJ

If the code changes, particularly in KubernetesClusterSchedulerBackend, the command has to be re-run before running the test so that the docker images are up to date.

@erikerlandson
Copy link
Collaborator

@mccheah

(keeping topic numbers)
(1) We have been more or less assuming a policy that only URIs accessible via curl (wget, etc) from inside the cluster would be usable. Chiefly on the grounds that supporting it would require machinery for shipping local files over the net 😁 I still think this is a reasonable policy in the name of keeping the implementation is lean as possible, but I'm also interested in any community feedback.

(2) I had been going with a "single image" design on the theory that it would make it easier for the Spark infra to build a single image (or for anybody else spinning their own). There being no free lunch, it means that there are shim scripts for each use case (driver.sh, executor.sh, etc) that are expected to reside on the image. Even if function-specific images are supported, there's no reason that a person couldn't simply provide the same image name for all of them if they wish to use a single image design. I consider it an open question too. I haven't done much work trying to reduce image size. I expect some reduction is possible by manipulating the base image and dockerfile logic.

(2a) I do advocate that we work via shim scripts, as that allows an important way to customize container startup logic. For example, when I run against OpenShift, I have special images with logic in the shim scripts that handles some cruft around running as non-root. Also, the shim scripts are what allow me to spin up a shuffle service just prior to executor startup.

(5) I've been actually running shuffle scripts right in the same container as the executor (see 2a). So the life cycle is fairly clean, since the shuffle service shuts down with the container (and after the executor itself). The main potential drawback with this is that you are committing to one shuffle service per executor. I'm not sure that's bad (it's what you'd expect in a standalone cluster) but it would be nice to test the potential efficiencies of one shuffle service per physical node, if it can be made to work.

(7) With respect to doRequestExecutors, my interpretation has been that it has the job of managing scale-down, as it's on the Cluster Backend, as opposed to the Cluster Manager. Also, I don't see it removing executors if I don't have shutdown logic there. I'm going to re-verify that to make sure I didn't miss something there.

@mccheah
Copy link
Collaborator Author

mccheah commented Nov 23, 2016

@erikerlandson (also keeping topic numbers) Thanks for following up!

(1) YARN cluster mode supports uploading local files, but Mesos cluster mode does not. I'm expecting the use case of uploading local files to be important.

(2) The ease of building a single image is traded off with the complexity of the client code needing to specify the correct shim script to use in each case. I think the difference is what the contract is expected to be: it's either expecting to be able to run a given script with a particular name, or, that the image runs an executor/driver "somehow". Now, considering we have these two contracts, we can reason about what the user will have to look at if they wanted to do customization; where will they get the point of reference for what is done in the default case, so that they can build on that? In the case where the command is specified by the code, we either have to document explicitly what script we are invoking, or else the user has to find that our code is invoking driver.sh and executor.sh. The user also has to understand what the scripts do. On the other hand, when we leave the command to the docker file, the docker file contains all of the information about what is expected to be run in the default case. Keeping all of the information about what's being run explicitly in the docker file makes it easier to trace what is going on, as opposed to needing to understand both the docker file, and the fact that our code is invoking specific scripts, and what those scripts do. However I do expect the customization use case to be the exception, and not the norm; we can follow up with the community on this topic.

(2a) The shim scripts aren't necessary for customizing container startup since the custom logic can be baked into the Dockerfile's command. For example I can tell the Dockerfile to run a script that eventually invokes the Java runtime, instead of the default Dockerfile's behavior of calling the Java runtime immediately.

(5) The shuffle scripts can't run in the same container as the executor since the shuffle service needs to continue living even if the executor dies. In the case that the executor container also runs the shuffle service, if the executor container is shut down because of dynamic allocation, we don't want to lose the shuffle service process and its managed shuffle files. Thus the lifecycle of the shuffle service process needs to be completely separate from the lifecycle of the executors that run alongside it.

(7) doRequestTotalExecutors can't shut down executors because it's not given the list of executor IDs to shut down. Spark scales down executors intelligently; it will only shut down executors that have less data stored in their caches. I'm understanding the contract to be that when executors are being scaled down, doKillExecutors does the actual shutdown of the executors while doRequestTotalExecutors informs the cluster manager about the new expected number of executors.

@tnachen
Copy link

tnachen commented Nov 23, 2016

@erikelandson I actually left a comment in your PR a while back about doRequestTotalExecutors, that the call should only about scaling up but not about removing any executors

@erikerlandson
Copy link
Collaborator

@mccheah @tnachen
re: (7), I re-ran some experiments with a better test harness, and I've been wrong about this! I have a feeling my previous tests didn't leave any executors idle long enough for the relevant idle timeout to kick in. Looks like it defaults to 60 seconds. This time I could see it killing them with my scaledown code turned off.

@foxish
Copy link
Owner

foxish commented Nov 24, 2016

@erikerlandson

(5) I think the two alternatives could be: running the shuffle service in a different container within the executor pod, or using the daemonset. There was some discussion on how we could make this work with the least amount of wasted resources, when we shut down an executor. I'll try and update the prototype as well.
I think we also need to get performance numbers at some point soon, to compare.

@erikerlandson
Copy link
Collaborator

erikerlandson commented Nov 24, 2016

re: (7), for posterity, the test harness I used this time is here

@tnachen
Copy link

tnachen commented Nov 25, 2016

@foxish what is the reason the performance between those two alternatives are expected to be different? I would assume if a daemonset vs within the pod if the volume is somehow mounted in the executor pod perf should be the same?

@ash211
Copy link

ash211 commented Nov 27, 2016

If you can share local disk between an executor pod and the shuffle service in both daemonset pods and in-pod-sidecar designs, then I'd expect read/write throughput to be the same. But to get to that point I think it's mostly a question of whether local disk can be shared between pods.

The wasted resources concern is a good one as well:

In the daemonset approach there could often be shuffle service pods running on hosts that aren't also running active executors, so those shuffle service pods would be unused during that time. That seems like wasting resources.

But in the sidecar approach, where the shuffle service container runs in the same pod as the executor container, there are other problems. During dynamic allocation of executors the desired executor count could shrink, ideally causing executor containers to shut down as well. But in the paired executor+shuffle containers in the pod design, that would leave some pods now running only the shuffle service container.

The purpose of the shuffle service (at least in YARN's design) is to hold shuffle state in a relatively durable place that isn't tied to the lifecycle of the executor.

So in the sidecar approach the design would need to account for shrinking executors while keeping shuffle services, and when increasing desired executor count would need to either A) add executors back into open shuffle service pods, or B) add brand new executor+shuffle service pods. I don't think the current k8s feature set supports (A), and going with (B) means

Bottom line is, there's not yet a clear best solution for how to integrate the shuffle service into the k8s cluster.

@erikerlandson
Copy link
Collaborator

The current situation is that I put the shuffle service on the same container as the executor, so you can't get into the funny state where a pod is running a shuffle service but no executor. When an executor (and its container) is killed by the scheduler, then by definition so is its shuffle service.

Daemon sets can be also be configured to run on sub-sets of the entire cluster. I think executor pods might also be steered to nodes running a shuffle service, using labels.

@mccheah
Copy link
Collaborator Author

mccheah commented Nov 28, 2016

@erikerlandson we're going to run into bugs and errors if the shuffle service is killed along with the executor during dynamic allocation scheduling. The shuffle service is expected to continue running and have its disk preserved regardless of what happens to the executors.

@erikerlandson
Copy link
Collaborator

@mccheah is that still true if every executor is paired with exactly one shuffle service running on the same container? In my current set up, each shuffle service is dedicated to one executor. Killing it assumes that the single executor it was servicing was already idle, or that the entire app was finished and all executors are going down.

@mccheah
Copy link
Collaborator Author

mccheah commented Nov 28, 2016

My understanding is that the shuffle service has two main functions:

  1. Storing shuffle data from executors that run on its hosts, and
  2. Sending data to arbitrary executors that ask for it.

Storing shuffle data is implicit as the executors write to local disk and the external shuffle manager knows where the executors are writing to. After the data is stored on the shuffle service host's disk, executors will query the shuffle service directly to open blocks. The shuffle service thus can remain alive, with no executor running on its host, but still serving shuffle files requested by any of the remaining executors.

I haven't traced through everything end to end recently, but I'm gathering this from code in ShuffleBlockFetcherIterator, where the shuffle client's fetchBlocks() is given a specific host to read from. It appears that the passed host is arbitrary, depending on the location of the block that is needed by this iterator.

@ash211
Copy link

ash211 commented Nov 28, 2016

My understanding of the data flow during a shuffle from executor to executor through a shuffle service for YARN's implementation is:

  1. executorA is running on a NodeManager that has a shuffleServiceA also running on the same host in the NM as an auxiliary service
  2. executorA writes shuffled data to local disk
  3. executorA notifies its colocated shuffleServiceA of that data's path on disk
  4. executorB requests a block from shuffleServiceA
  5. shuffleServiceA responds with that block from local disk

In this data flow executorA is able to shut down after step 3 and the job continues running without needing to recompute because the shuffle data is stored in shuffleServiceA which was not shut down.

In the design @erikerlandson mentions it sounds like executorA being idle would cause executorA and shuffleServiceA to shut down together. In that case if executorB hasn't yet safely retrieved the shuffled block from shuffleServiceA, then that data would need to be re-calculated which will have a significant negative impact on job progress.

shuffleServiceA's lifecycle is less dependent on what executorA is doing and more dependent on what executorB is doing.

@erikerlandson
Copy link
Collaborator

@ash211 @mccheah yeah I can see how there might be a race condition there, particularly if there were stragglers. By default there would be at least 60 seconds where other executors could access a shuffle service after said executor went idle. And the grace period could be increased with spark.dynamicAllocation.executorIdleTimeout to further reduce any performance hits from recomputing lost work.

(none of which should be construed as a lack of interest in pursuing shuffle service with daemon-sets)

In a sense, all of this in just increasing my interest in supporting dynamic executor scaling from the kube layer, which could side-step the need for running the shuffle service. If the kube layer wants to scale down, put some executors into draining state using, say, disableExecutor method, then remove them once they've been idle for some period.

@foxish
Copy link
Owner

foxish commented Nov 29, 2016

@foxish what is the reason the performance between those two alternatives are expected to be different? I would assume if a daemonset vs within the pod if the volume is somehow mounted in the executor pod perf should be the same?

@tnachen I should have explained better.
I still think that using hostpath with daemonset may not be ideal - the fact that it breaks out of the PV/Pod level storage abstractions to rely directly on node level storage is dangerous. One alternative that I've been thinking about would be to use PVs mounted as ReadWriteMany to share data between the daemonset pods and the executors. However, we would lose the locality that node local storage (EmptyDir, hostpath) offers. I'm unsure how much of a performance impact this would have, and that was what I intended to measure. Many kinds of disks however do not support ReadWriteMany, which is another issue. The ideal solution for storage if we use DaemonSets is sticky emptydirs which we don't have yet.

@erikerlandson
Copy link
Collaborator

Just to add some context: Past experience suggests that when it comes to getting code adopted upstream, smaller and leaner is better whenever possible. With that in mind, I'm approaching all the work with an eye toward preserving features but traded off against keeping the code as lean as possible.

I'm very interested in everybody's ideas on getting to a PR that provides the best functionality-to-size ratio.

To get the ball rolling upstream, I generated an "official" PR against Spark:
apache#16061

My thought was to use the "foxish" branch as a substrate for integrating everybody's work, because it is closer to a "MVP" implementation. I'm excited about acquiring additional features from @mccheah's #7 and @iyanuobidele's upcoming PR, etc.

printWarning(s"Master ${args.master} is deprecated since 2.0." +
" Please use master \"yarn\" with specified deploy mode instead.")
YARN
case m if m.startsWith("kubernetes") => KUBERNETES
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have been using k8s in the branch

import java.io.IOException;
import java.nio.ByteBuffer;

public class KubernetesExternalShuffleClient extends ExternalShuffleClient {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this being used?
And can you comment on why we need a new K8s shuffle client?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to document the fuller context somewhere about how security with the external shuffle service via daemon sets and secrets works, but this particular shuffle service client is used when an application completes and needs to clean up the secret being used to communicate with the shuffle service.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand, however I don't see it being used in this PR. Another PR will use this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be being used in KubernetesClusterSchedulerBackend: https://github.com/foxish/spark/pull/7/files#diff-857cd4a3ee24d6110c51756c8a3f051fR475

var executorDockerImage: String = null
var customExecutorSpecFile: String = null
var customExecutorSpecContainerName: String = null
var kubernetesMaster: String = null
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think ideally putting these not into Spark Submit as much as possible is better, via configurations I think is the more preferred way as otherwise SparkSubmit will become very huge

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

YARN allows specifying some configurations via spark submit arguments (e.g. queues) but it's unclear which arguments belong in spark submit arguments and which belong as Spark configurations.

import scala.io.Source

private[spark] object KubernetesClientBuilder {
private val API_SERVER_TOKEN = new File("/var/run/secrets/kubernetes.io/serviceaccount/token")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these paths Kubernetes standard? Or this is just assumed here to exist?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it thanks!

@Consumes(Array(MediaType.APPLICATION_JSON))
@Produces(Array(MediaType.APPLICATION_JSON))
@Path("/submitApplication")
def submitApplication(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if all we need this is for is to upload files, we can introduce this seperately and not couple with the Spark Rest interface

@tnachen
Copy link

tnachen commented Nov 29, 2016

@foxish Ah I see, I didn't realize we're looking to use PV here. The issue you referenced sounds like would suit the best here (similar to Mesos persistent volumes).

My opinion around shuffle service is that using the daemonset approach makes more sense as the shuffle service is designed to run per node, besides the other discussions already happened, if we differ and gets creative then we have to make sure down the road no one from the Spark side does something that assumes everyone is only running one per host can break this.

Since shuffle service and dynamic allocation is not strictly required, I think our first PR doesn't really need to include it, and we can punt until local empty dir PV feature is merged to implement it, as there are plenty of other things to take care of.

val shuffleServiceForPod = kubernetesClient
.inNamespace(service.daemonSetNamespace)
.pods()
.inNamespace(service.daemonSetNamespace)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to specify namespace twice

status.getName == DRIVER_LAUNCHER_CONTAINER_NAME && status.getReady) match {
case Some(status) =>
try {
val driverLauncher = getDriverLauncherService(k8ClientConfig)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't we simplify this if we simply translate this into a client mode spark-submit call with k8s?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we mean perhaps submitting a pod that then itself invokes spark-submit for client mode? That could work, but we still need to provide the user's dependencies somehow.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we need to basically translate the user dependencies and configuration into a downstream spark-submit call, but this allows a client mode implementation that doesn't need to consider how applications are managed/submitted, and drastically simplifies the cluster mode too.
This is exactly what Mesos cluster scheduler does, and what Standalone cluster mode does this too.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That seems reasonable, although we would still need some middle process to receive the dependencies from the user's local disk - not sure if we can invoke the spark-submit command on the pod itself, there's some wiring that needs to be done beforehand.

@ash211
Copy link

ash211 commented Nov 30, 2016

@tnachen I'm not familiar with how the shuffle service works in Mesos -- can you describe that implementation and draw parallels if applicable to the k8s daemon set or sidecar designs? Is there a design doc that discusses it?

@vgkowski
Copy link

I don't know very well kubernetes but I am used to spark on mesos. The only objective of the shuffle service is to make shuffle data survive executor shutdown. If their lifecycle are linked (in pod or whatever), the shuffle service is useless because the executor can already manage shuffle data on its own, it will be simplier.
We generally run one shuffle service per node that is shared by spark jobs (for simplicity), I haven't tested dedicated shuffle service.

@mccheah
Copy link
Collaborator Author

mccheah commented Feb 22, 2017

Closing in favor of work being done on our other repository: https://github.com/apache-spark-on-k8s/spark

@mccheah mccheah closed this Feb 22, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants