Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
7935ddf
[SPARK-18278] Minimal support for submitting to Kubernetes.
mccheah Dec 6, 2016
4f916a0
Fix style
mccheah Dec 6, 2016
69bec9d
Make naming more consistent
mccheah Dec 7, 2016
7cd9b57
Fix building assembly with Kubernetes.
mccheah Dec 9, 2016
47ff356
Service account support, use constants from fabric8 library.
mccheah Dec 10, 2016
bf1db11
Some small changes
mccheah Jan 7, 2017
32ef03a
Use k8s:// formatted URL instead of separate setting.
mccheah Jan 9, 2017
23bfbc1
Reindent comment to conforn to JavaDoc style
foxish Jan 9, 2017
16e5420
Move kubernetes under resource-managers folder.
mccheah Jan 9, 2017
7ae3add
Use tar and gzip to compress+archive shipped jars (#2)
mccheah Jan 11, 2017
3085d94
Use alpine and java 8 for docker images. (#10)
mccheah Jan 12, 2017
a736e7a
Copy the Dockerfiles from docker-minimal-bundle into the distribution…
mccheah Jan 12, 2017
c154065
inherit IO (#13)
foxish Jan 12, 2017
011284f
Error messages when the driver container fails to start. (#11)
mccheah Jan 13, 2017
8db46e2
Fix linter error to make CI happy (#18)
foxish Jan 13, 2017
0dd5b3a
Documentation for the current state of the world (#16)
mccheah Jan 13, 2017
39bf95a
Development workflow documentation for the current state of the world…
mccheah Jan 13, 2017
28562e2
Added service name as prefix to executor pods (#14)
foxish Jan 13, 2017
b2485e5
Add kubernetes profile to travis CI yml file (#21)
kimoonkim Jan 14, 2017
d3116ea
Improved the example commands in running-on-k8s document. (#25)
lins05 Jan 17, 2017
94a09aa
Fix spacing for command highlighting (#31)
foxish Jan 18, 2017
70e36af
Support custom labels on the driver pod. (#27)
mccheah Jan 19, 2017
da6723c
Make pod name unique using the submission timestamp (#32)
foxish Jan 19, 2017
4f6f42f
A number of small tweaks to the MVP. (#23)
mccheah Jan 24, 2017
7834850
Correct hadoop profile: hadoop2.7 -> hadoop-2.7 (#41)
ash211 Jan 25, 2017
7cd0762
Support setting the driver pod launching timeout. (#36)
lins05 Jan 25, 2017
c808f09
Sanitize kubernetesAppId for use in secret, service, and pod names (#45)
ash211 Jan 25, 2017
99d0155
Support spark.driver.extraJavaOptions (#48)
kimoonkim Jan 26, 2017
7f72ee6
Use "extraScalaTestArgs" to pass extra options to scalatest. (#52)
lins05 Jan 26, 2017
5e301a2
Use OpenJDK8's official Alpine image. (#51)
mccheah Jan 26, 2017
b15bd55
Remove unused driver extra classpath upload code (#54)
mccheah Jan 26, 2017
d9ad123
Fix k8s integration tests (#44)
lins05 Jan 27, 2017
5799783
Added GC to components (#56)
foxish Jan 27, 2017
b8db05c
Create README to better describe project purpose (#50)
ash211 Jan 28, 2017
5e5c9d2
Access the Driver Launcher Server over NodePort for app launch + subm…
mccheah Jan 30, 2017
ad22920
Extract constants and config into separate file. Launch => Submit. (#65)
mccheah Jan 31, 2017
b30bb2b
Retry the submit-application request to multiple nodes (#69)
mccheah Feb 2, 2017
aba180d
Allow adding arbitrary files (#71)
mccheah Feb 2, 2017
0ce85c3
Fix NPE around unschedulable pod specs (#79)
ash211 Feb 2, 2017
ed21e96
Introduce blocking submit to kubernetes by default (#53)
ash211 Feb 3, 2017
c92683f
Do not wait for pod finishing in integration tests. (#84)
lins05 Feb 3, 2017
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ notifications:
# 5. Run maven install before running lint-java.
install:
- export MAVEN_SKIP_RC=1
- build/mvn -T 4 -q -DskipTests -Pmesos -Pyarn -Phadoop-2.3 -Pkinesis-asl -Phive -Phive-thriftserver install
- build/mvn -T 4 -q -DskipTests -Pmesos -Pyarn -Phadoop-2.3 -Pkubernetes -Pkinesis-asl -Phive -Phive-thriftserver install

# 6. Run lint-java.
script:
Expand Down
38 changes: 38 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,41 @@
# Apache Spark On Kubernetes

This repository, located at https://github.com/apache-spark-on-k8s/spark, contains a fork of Apache Spark that enables running Spark jobs natively on a Kubernetes cluster.

## What is this?

This is a collaboratively maintained project working on [SPARK-18278](https://issues.apache.org/jira/browse/SPARK-18278). The goal is to bring native support for Spark to use Kubernetes as a cluster manager, in a fully supported way on par with the Spark Standalone, Mesos, and Apache YARN cluster managers.

## Getting Started

- [Usage guide](docs/running-on-kubernetes.md) shows how to run the code
- [Development docs](resource-managers/kubernetes/README.md) shows how to get set up for development
- Code is primarily located in the [resource-managers/kubernetes](resource-managers/kubernetes) folder

## Why does this fork exist?

Adding native integration for a new cluster manager is a large undertaking. If poorly executed, it could introduce bugs into Spark when run on other cluster managers, cause release blockers slowing down the overall Spark project, or require hotfixes which divert attention away from development towards managing additional releases. Any work this deep inside Spark needs to be done carefully to minimize the risk of those negative externalities.

At the same time, an increasing number of people from various companies and organizations desire to work together to natively run Spark on Kubernetes. The group needs a code repository, communication forum, issue tracking, and continuous integration, all in order to work together effectively on an open source product.

We've been asked by an Apache Spark Committer to work outside of the Apache infrastructure for a short period of time to allow this feature to be hardened and improved without creating risk for Apache Spark. The aim is to rapidly bring it to the point where it can be brought into the mainline Apache Spark repository for continued development within the Apache umbrella. If all goes well, this should be a short-lived fork rather than a long-lived one.

## Who are we?

This is a collaborative effort by several folks from different companies who are interested in seeing this feature be successful. Companies active in this project include (alphabetically):

- Google
- Haiwen
- Hyperpilot
- Intel
- Palantir
- Pepperdata
- Red Hat

--------------------

(original README below)

# Apache Spark

Spark is a fast and general cluster computing system for Big Data. It provides
Expand Down
10 changes: 10 additions & 0 deletions assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,16 @@
</dependency>
</dependencies>
</profile>
<profile>
<id>kubernetes</id>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-kubernetes_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
<profile>
<id>hive</id>
<dependencies>
Expand Down
30 changes: 26 additions & 4 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ object SparkSubmit {
private val STANDALONE = 2
private val MESOS = 4
private val LOCAL = 8
private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL
private val KUBERNETES = 16
private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | KUBERNETES | LOCAL

// Deploy modes
private val CLIENT = 1
Expand Down Expand Up @@ -239,9 +240,10 @@ object SparkSubmit {
YARN
case m if m.startsWith("spark") => STANDALONE
case m if m.startsWith("mesos") => MESOS
case m if m.startsWith("k8s") => KUBERNETES
case m if m.startsWith("local") => LOCAL
case _ =>
printErrorAndExit("Master must either be yarn or start with spark, mesos, local")
printErrorAndExit("Master must either be yarn or start with spark, mesos, k8s, or local")
-1
}

Expand Down Expand Up @@ -284,6 +286,7 @@ object SparkSubmit {
}
val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
val isMesosCluster = clusterManager == MESOS && deployMode == CLUSTER
val isKubernetesCluster = clusterManager == KUBERNETES && deployMode == CLUSTER

// Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files
// too for packages that include Python code
Expand Down Expand Up @@ -330,6 +333,10 @@ object SparkSubmit {

// The following modes are not supported or applicable
(clusterManager, deployMode) match {
case (KUBERNETES, CLIENT) =>
printErrorAndExit("Client mode is currently not supported for Kubernetes.")
case (KUBERNETES, CLUSTER) if args.isPython || args.isR =>
printErrorAndExit("Kubernetes does not currently support python or R applications.")
case (STANDALONE, CLUSTER) if args.isPython =>
printErrorAndExit("Cluster deploy mode is currently not supported for python " +
"applications on standalone clusters.")
Expand Down Expand Up @@ -463,7 +470,14 @@ object SparkSubmit {
OptionAssigner(args.principal, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.principal"),
OptionAssigner(args.keytab, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.keytab"),

// Other options
OptionAssigner(args.kubernetesNamespace, KUBERNETES, ALL_DEPLOY_MODES,
sysProp = "spark.kubernetes.namespace"),
OptionAssigner(args.kubernetesUploadJars, KUBERNETES, CLUSTER,
sysProp = "spark.kubernetes.driver.uploads.jars"),
OptionAssigner(args.kubernetesUploadFiles, KUBERNETES, CLUSTER,
sysProp = "spark.kubernetes.driver.uploads.files"),

// Other options
OptionAssigner(args.executorCores, STANDALONE | YARN, ALL_DEPLOY_MODES,
sysProp = "spark.executor.cores"),
OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, ALL_DEPLOY_MODES,
Expand Down Expand Up @@ -506,8 +520,9 @@ object SparkSubmit {

// Add the application jar automatically so the user doesn't have to call sc.addJar
// For YARN cluster mode, the jar is already distributed on each node as "app.jar"
// In Kubernetes cluster mode, the jar will be uploaded by the client separately.
// For python and R files, the primary resource is already distributed as a regular file
if (!isYarnCluster && !args.isPython && !args.isR) {
if (!isYarnCluster && !isKubernetesCluster && !args.isPython && !args.isR) {
var jars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq.empty)
if (isUserJar(args.primaryResource)) {
jars = jars ++ Seq(args.primaryResource)
Expand Down Expand Up @@ -606,6 +621,13 @@ object SparkSubmit {
}
}

if (isKubernetesCluster) {
childMainClass = "org.apache.spark.deploy.kubernetes.Client"
childArgs += args.primaryResource
childArgs += args.mainClass
childArgs ++= args.childArgs
}

// Load any properties specified through --conf and the default properties file
for ((k, v) <- args.sparkProperties) {
sysProps.getOrElseUpdate(k, v)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
var principal: String = null
var keytab: String = null

// Kubernetes only
var kubernetesNamespace: String = null
var kubernetesUploadJars: String = null
var kubernetesUploadFiles: String = null

// Standalone cluster mode only
var supervise: Boolean = false
var driverCores: String = null
Expand Down Expand Up @@ -186,6 +191,15 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
.getOrElse(sparkProperties.get("spark.executor.instances").orNull)
keytab = Option(keytab).orElse(sparkProperties.get("spark.yarn.keytab")).orNull
principal = Option(principal).orElse(sparkProperties.get("spark.yarn.principal")).orNull
kubernetesNamespace = Option(kubernetesNamespace)
.orElse(sparkProperties.get("spark.kubernetes.namespace"))
.orNull
kubernetesUploadJars = Option(kubernetesUploadJars)
.orElse(sparkProperties.get("spark.kubernetes.driver.uploads.jars"))
.orNull
kubernetesUploadFiles = Option(kubernetesUploadFiles)
.orElse(sparkProperties.get("spark.kubernetes.driver.uploads.files"))
.orNull

// Try to set main class from JAR if no --class argument is given
if (mainClass == null && !isPython && !isR && primaryResource != null) {
Expand Down Expand Up @@ -426,6 +440,15 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
case KEYTAB =>
keytab = value

case KUBERNETES_NAMESPACE =>
kubernetesNamespace = value

case KUBERNETES_UPLOAD_JARS =>
kubernetesUploadJars = value

case KUBERNETES_UPLOAD_FILES =>
kubernetesUploadFiles = value

case HELP =>
printUsageAndExit(0)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@ package org.apache.spark.deploy.rest

import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse}

import scala.io.Source

import com.fasterxml.jackson.core.JsonProcessingException
import org.eclipse.jetty.server.{HttpConnectionFactory, Server, ServerConnector}
import org.eclipse.jetty.http.HttpVersion
import org.eclipse.jetty.server.{HttpConfiguration, HttpConnectionFactory, Server, ServerConnector, SslConnectionFactory}
import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder}
import org.eclipse.jetty.util.thread.{QueuedThreadPool, ScheduledExecutorScheduler}
import org.json4s._
import org.json4s.jackson.JsonMethods._
import scala.io.Source

import org.apache.spark.{SPARK_VERSION => sparkVersion, SparkConf}
import org.apache.spark.{SPARK_VERSION => sparkVersion, SparkConf, SSLOptions}
import org.apache.spark.internal.Logging
import org.apache.spark.util.Utils

Expand All @@ -50,7 +50,8 @@ import org.apache.spark.util.Utils
private[spark] abstract class RestSubmissionServer(
val host: String,
val requestedPort: Int,
val masterConf: SparkConf) extends Logging {
val masterConf: SparkConf,
val sslOptions: SSLOptions = SSLOptions()) extends Logging {
protected val submitRequestServlet: SubmitRequestServlet
protected val killRequestServlet: KillRequestServlet
protected val statusRequestServlet: StatusRequestServlet
Expand Down Expand Up @@ -79,19 +80,32 @@ private[spark] abstract class RestSubmissionServer(
* Return a 2-tuple of the started server and the bound port.
*/
private def doStart(startPort: Int): (Server, Int) = {
// TODO consider using JettyUtils#startServer to do this instead
val threadPool = new QueuedThreadPool
threadPool.setDaemon(true)
val server = new Server(threadPool)

val resolvedConnectionFactories = sslOptions
.createJettySslContextFactory()
.map(sslFactory => {
val sslConnectionFactory = new SslConnectionFactory(
sslFactory, HttpVersion.HTTP_1_1.asString())
val rawHttpConfiguration = new HttpConfiguration()
rawHttpConfiguration.setSecureScheme("https")
rawHttpConfiguration.setSecurePort(startPort)
val rawHttpConnectionFactory = new HttpConnectionFactory(rawHttpConfiguration)
Array(sslConnectionFactory, rawHttpConnectionFactory)
}).getOrElse(Array(new HttpConnectionFactory()))

val connector = new ServerConnector(
server,
null,
// Call this full constructor to set this, which forces daemon threads:
new ScheduledExecutorScheduler("RestSubmissionServer-JettyScheduler", true),
null,
-1,
-1,
new HttpConnectionFactory())
server,
null,
// Call this full constructor to set this, which forces daemon threads:
new ScheduledExecutorScheduler("RestSubmissionServer-JettyScheduler", true),
null,
-1,
-1,
resolvedConnectionFactories: _*)
connector.setHost(host)
connector.setPort(startPort)
server.addConnector(connector)
Expand Down
7 changes: 7 additions & 0 deletions dev/make-distribution.sh
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,13 @@ echo "Build flags: $@" >> "$DISTDIR/RELEASE"
# Copy jars
cp "$SPARK_HOME"/assembly/target/scala*/jars/* "$DISTDIR/jars/"

# Copy docker files
mkdir -p "$DISTDIR/dockerfiles/driver"
mkdir -p "$DISTDIR/dockerfiles/executor"
DOCKERFILES_SRC="$SPARK_HOME/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker"
cp "$DOCKERFILES_SRC/driver/Dockerfile" "$DISTDIR/dockerfiles/driver/Dockerfile"
cp "$DOCKERFILES_SRC/executor/Dockerfile" "$DISTDIR/dockerfiles/executor/Dockerfile"

# Only create the yarn directory if the yarn artifacts were build.
if [ -f "$SPARK_HOME"/common/network-yarn/target/scala*/spark-*-yarn-shuffle.jar ]; then
mkdir "$DISTDIR"/yarn
Expand Down
2 changes: 2 additions & 0 deletions dev/scalastyle
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ ERRORS=$(echo -e "q\n" \
-Pyarn \
-Phive \
-Phive-thriftserver \
-Pkubernetes \
-Pkubernetes-integration-tests \
scalastyle test:scalastyle \
| awk '{if($1~/error/)print}' \
)
Expand Down
1 change: 1 addition & 0 deletions docs/_layouts/global.html
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
<li><a href="spark-standalone.html">Spark Standalone</a></li>
<li><a href="running-on-mesos.html">Mesos</a></li>
<li><a href="running-on-yarn.html">YARN</a></li>
<li><a href="running-on-kubernetes.html">Kubernetes</a></li>
</ul>
</li>

Expand Down
1 change: 1 addition & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ options for deployment:
* [Mesos](running-on-mesos.html): deploy a private cluster using
[Apache Mesos](http://mesos.apache.org)
* [YARN](running-on-yarn.html): deploy Spark on top of Hadoop NextGen (YARN)
* [Kubernetes](running-on-kubernetes.html): deploy Spark on top of Kubernetes

**Other Documents:**

Expand Down
Loading