Skip to content

Commit e99a8ba

Browse files
authored
Merge pull request apache-spark-on-k8s#104 from palantir/rebase-k8s-onto-palantir-spark
Bring first version of apache-spark-on-k8s into Palantir Spark
2 parents f762e6b + 2af1ada commit e99a8ba

File tree

49 files changed

+4591
-21
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+4591
-21
lines changed

.travis.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ notifications:
4444
# 5. Run maven install before running lint-java.
4545
install:
4646
- export MAVEN_SKIP_RC=1
47-
- build/mvn -T 4 -q -DskipTests -Pmesos -Pyarn -Phadoop-2.3 -Pkinesis-asl -Phive -Phive-thriftserver install
47+
- build/mvn -T 4 -q -DskipTests -Pmesos -Pyarn -Phadoop-2.3 -Pkubernetes -Pkinesis-asl -Phive -Phive-thriftserver install
4848

4949
# 6. Run lint-java.
5050
script:

README.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,17 @@
1+
# Palantir distribution of Apache Spark
2+
3+
This repository, located at https://github.com/palantir/spark, contains a fork of Apache Spark. It includes a number of changes that are not yet on the Apache master branch that are useful at Palantir.
4+
5+
Some highlights include:
6+
7+
- kubernetes support, sourced from https://github.com/apache-spark-on-k8s/spark
8+
- predicate pushdown additions, including a patched version of Parquet
9+
- various misc bugfixes
10+
11+
--------------------
12+
13+
(original README below)
14+
115
# Apache Spark
216

317
Spark is a fast and general cluster computing system for Big Data. It provides

assembly/pom.xml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,16 @@
148148
</dependency>
149149
</dependencies>
150150
</profile>
151+
<profile>
152+
<id>kubernetes</id>
153+
<dependencies>
154+
<dependency>
155+
<groupId>org.apache.spark</groupId>
156+
<artifactId>spark-kubernetes_${scala.binary.version}</artifactId>
157+
<version>${project.version}</version>
158+
</dependency>
159+
</dependencies>
160+
</profile>
151161
<profile>
152162
<id>hive</id>
153163
<dependencies>

core/src/main/resources/org/apache/spark/ui/static/executorspage.js

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,28 @@ $(document).ajaxStart(function () {
5454
$.blockUI({message: '<h3>Loading Executors Page...</h3>'});
5555
});
5656

57+
function findKubernetesServiceBaseURI() {
58+
var k8sProxyPattern = '/api/v1/proxy/namespaces/';
59+
var k8sProxyPatternPos = document.baseURI.indexOf(k8sProxyPattern);
60+
if (k8sProxyPatternPos > 0) {
61+
// Spark is running in a kubernetes cluster, and the web ui is served
62+
// through the kubectl proxy.
63+
var remaining = document.baseURI.substr(k8sProxyPatternPos + k8sProxyPattern.length);
64+
var urlSlashesCount = remaining.split('/').length - 3;
65+
var words = document.baseURI.split('/');
66+
var baseURI = words.slice(0, words.length - urlSlashesCount).join('/');
67+
return baseURI;
68+
}
69+
70+
return null;
71+
}
72+
5773
function createTemplateURI(appId) {
74+
var kubernetesBaseURI = findKubernetesServiceBaseURI();
75+
if (kubernetesBaseURI) {
76+
return kubernetesBaseURI + '/static/executorspage-template.html';
77+
}
78+
5879
var words = document.baseURI.split('/');
5980
var ind = words.indexOf("proxy");
6081
if (ind > 0) {
@@ -70,6 +91,14 @@ function createTemplateURI(appId) {
7091
}
7192

7293
function getStandAloneppId(cb) {
94+
var kubernetesBaseURI = findKubernetesServiceBaseURI();
95+
if (kubernetesBaseURI) {
96+
var appIdAndPort = kubernetesBaseURI.split('/').slice(-1)[0];
97+
var appId = appIdAndPort.split(':')[0];
98+
cb(appId);
99+
return;
100+
}
101+
73102
var words = document.baseURI.split('/');
74103
var ind = words.indexOf("proxy");
75104
if (ind > 0) {
@@ -95,6 +124,11 @@ function getStandAloneppId(cb) {
95124
}
96125

97126
function createRESTEndPoint(appId) {
127+
var kubernetesBaseURI = findKubernetesServiceBaseURI();
128+
if (kubernetesBaseURI) {
129+
return kubernetesBaseURI + "/api/v1/applications/" + appId + "/allexecutors";
130+
}
131+
98132
var words = document.baseURI.split('/');
99133
var ind = words.indexOf("proxy");
100134
if (ind > 0) {

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,8 @@ object SparkSubmit extends CommandLineUtils {
6969
private val STANDALONE = 2
7070
private val MESOS = 4
7171
private val LOCAL = 8
72-
private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL
72+
private val KUBERNETES = 16
73+
private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | KUBERNETES | LOCAL
7374

7475
// Deploy modes
7576
private val CLIENT = 1
@@ -229,9 +230,10 @@ object SparkSubmit extends CommandLineUtils {
229230
YARN
230231
case m if m.startsWith("spark") => STANDALONE
231232
case m if m.startsWith("mesos") => MESOS
233+
case m if m.startsWith("k8s") => KUBERNETES
232234
case m if m.startsWith("local") => LOCAL
233235
case _ =>
234-
printErrorAndExit("Master must either be yarn or start with spark, mesos, local")
236+
printErrorAndExit("Master must either be yarn or start with spark, mesos, k8s, or local")
235237
-1
236238
}
237239

@@ -274,6 +276,7 @@ object SparkSubmit extends CommandLineUtils {
274276
}
275277
val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
276278
val isMesosCluster = clusterManager == MESOS && deployMode == CLUSTER
279+
val isKubernetesCluster = clusterManager == KUBERNETES && deployMode == CLUSTER
277280

278281
// Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files
279282
// too for packages that include Python code
@@ -320,6 +323,10 @@ object SparkSubmit extends CommandLineUtils {
320323

321324
// The following modes are not supported or applicable
322325
(clusterManager, deployMode) match {
326+
case (KUBERNETES, CLIENT) =>
327+
printErrorAndExit("Client mode is currently not supported for Kubernetes.")
328+
case (KUBERNETES, CLUSTER) if args.isPython || args.isR =>
329+
printErrorAndExit("Kubernetes does not currently support python or R applications.")
323330
case (STANDALONE, CLUSTER) if args.isPython =>
324331
printErrorAndExit("Cluster deploy mode is currently not supported for python " +
325332
"applications on standalone clusters.")
@@ -453,17 +460,21 @@ object SparkSubmit extends CommandLineUtils {
453460
OptionAssigner(args.principal, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.principal"),
454461
OptionAssigner(args.keytab, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.keytab"),
455462

456-
// Other options
463+
OptionAssigner(args.kubernetesNamespace, KUBERNETES, ALL_DEPLOY_MODES,
464+
sysProp = "spark.kubernetes.namespace"),
465+
466+
// Other options
457467
OptionAssigner(args.executorCores, STANDALONE | YARN, ALL_DEPLOY_MODES,
458468
sysProp = "spark.executor.cores"),
459469
OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, ALL_DEPLOY_MODES,
460470
sysProp = "spark.executor.memory"),
461471
OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, ALL_DEPLOY_MODES,
462472
sysProp = "spark.cores.max"),
463-
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, ALL_DEPLOY_MODES,
473+
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES,
464474
sysProp = "spark.files"),
465475
OptionAssigner(args.jars, LOCAL, CLIENT, sysProp = "spark.jars"),
466-
OptionAssigner(args.jars, STANDALONE | MESOS, ALL_DEPLOY_MODES, sysProp = "spark.jars"),
476+
OptionAssigner(args.jars, STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES,
477+
sysProp = "spark.jars"),
467478
OptionAssigner(args.driverMemory, STANDALONE | MESOS | YARN, CLUSTER,
468479
sysProp = "spark.driver.memory"),
469480
OptionAssigner(args.driverCores, STANDALONE | MESOS | YARN, CLUSTER,
@@ -496,8 +507,9 @@ object SparkSubmit extends CommandLineUtils {
496507

497508
// Add the application jar automatically so the user doesn't have to call sc.addJar
498509
// For YARN cluster mode, the jar is already distributed on each node as "app.jar"
510+
// In Kubernetes cluster mode, the jar will be uploaded by the client separately.
499511
// For python and R files, the primary resource is already distributed as a regular file
500-
if (!isYarnCluster && !args.isPython && !args.isR) {
512+
if (!isYarnCluster && !isKubernetesCluster && !args.isPython && !args.isR) {
501513
var jars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq.empty)
502514
if (isUserJar(args.primaryResource)) {
503515
jars = jars ++ Seq(args.primaryResource)
@@ -596,6 +608,13 @@ object SparkSubmit extends CommandLineUtils {
596608
}
597609
}
598610

611+
if (isKubernetesCluster) {
612+
childMainClass = "org.apache.spark.deploy.kubernetes.Client"
613+
childArgs += args.primaryResource
614+
childArgs += args.mainClass
615+
childArgs ++= args.childArgs
616+
}
617+
599618
// Load any properties specified through --conf and the default properties file
600619
for ((k, v) <- args.sparkProperties) {
601620
sysProps.getOrElseUpdate(k, v)

core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,9 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
7171
var principal: String = null
7272
var keytab: String = null
7373

74+
// Kubernetes only
75+
var kubernetesNamespace: String = null
76+
7477
// Standalone cluster mode only
7578
var supervise: Boolean = false
7679
var driverCores: String = null
@@ -186,6 +189,9 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
186189
.getOrElse(sparkProperties.get("spark.executor.instances").orNull)
187190
keytab = Option(keytab).orElse(sparkProperties.get("spark.yarn.keytab")).orNull
188191
principal = Option(principal).orElse(sparkProperties.get("spark.yarn.principal")).orNull
192+
kubernetesNamespace = Option(kubernetesNamespace)
193+
.orElse(sparkProperties.get("spark.kubernetes.namespace"))
194+
.orNull
189195

190196
// Try to set main class from JAR if no --class argument is given
191197
if (mainClass == null && !isPython && !isR && primaryResource != null) {
@@ -424,6 +430,9 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
424430
case KEYTAB =>
425431
keytab = value
426432

433+
case KUBERNETES_NAMESPACE =>
434+
kubernetesNamespace = value
435+
427436
case HELP =>
428437
printUsageAndExit(0)
429438

core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,16 @@ package org.apache.spark.deploy.rest
1919

2020
import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse}
2121

22-
import scala.io.Source
23-
2422
import com.fasterxml.jackson.core.JsonProcessingException
25-
import org.eclipse.jetty.server.{HttpConnectionFactory, Server, ServerConnector}
23+
import org.eclipse.jetty.http.HttpVersion
24+
import org.eclipse.jetty.server.{HttpConfiguration, HttpConnectionFactory, Server, ServerConnector, SslConnectionFactory}
2625
import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder}
2726
import org.eclipse.jetty.util.thread.{QueuedThreadPool, ScheduledExecutorScheduler}
2827
import org.json4s._
2928
import org.json4s.jackson.JsonMethods._
29+
import scala.io.Source
3030

31-
import org.apache.spark.{SPARK_VERSION => sparkVersion, SparkConf}
31+
import org.apache.spark.{SPARK_VERSION => sparkVersion, SparkConf, SSLOptions}
3232
import org.apache.spark.internal.Logging
3333
import org.apache.spark.util.Utils
3434

@@ -50,7 +50,8 @@ import org.apache.spark.util.Utils
5050
private[spark] abstract class RestSubmissionServer(
5151
val host: String,
5252
val requestedPort: Int,
53-
val masterConf: SparkConf) extends Logging {
53+
val masterConf: SparkConf,
54+
val sslOptions: SSLOptions = SSLOptions()) extends Logging {
5455
protected val submitRequestServlet: SubmitRequestServlet
5556
protected val killRequestServlet: KillRequestServlet
5657
protected val statusRequestServlet: StatusRequestServlet
@@ -79,19 +80,32 @@ private[spark] abstract class RestSubmissionServer(
7980
* Return a 2-tuple of the started server and the bound port.
8081
*/
8182
private def doStart(startPort: Int): (Server, Int) = {
83+
// TODO consider using JettyUtils#startServer to do this instead
8284
val threadPool = new QueuedThreadPool
8385
threadPool.setDaemon(true)
8486
val server = new Server(threadPool)
8587

88+
val resolvedConnectionFactories = sslOptions
89+
.createJettySslContextFactory()
90+
.map(sslFactory => {
91+
val sslConnectionFactory = new SslConnectionFactory(
92+
sslFactory, HttpVersion.HTTP_1_1.asString())
93+
val rawHttpConfiguration = new HttpConfiguration()
94+
rawHttpConfiguration.setSecureScheme("https")
95+
rawHttpConfiguration.setSecurePort(startPort)
96+
val rawHttpConnectionFactory = new HttpConnectionFactory(rawHttpConfiguration)
97+
Array(sslConnectionFactory, rawHttpConnectionFactory)
98+
}).getOrElse(Array(new HttpConnectionFactory()))
99+
86100
val connector = new ServerConnector(
87-
server,
88-
null,
89-
// Call this full constructor to set this, which forces daemon threads:
90-
new ScheduledExecutorScheduler("RestSubmissionServer-JettyScheduler", true),
91-
null,
92-
-1,
93-
-1,
94-
new HttpConnectionFactory())
101+
server,
102+
null,
103+
// Call this full constructor to set this, which forces daemon threads:
104+
new ScheduledExecutorScheduler("RestSubmissionServer-JettyScheduler", true),
105+
null,
106+
-1,
107+
-1,
108+
resolvedConnectionFactories: _*)
95109
connector.setHost(host)
96110
connector.setPort(startPort)
97111
server.addConnector(connector)

dev/make-distribution.sh

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,13 @@ echo "Build flags: $@" >> "$DISTDIR/RELEASE"
171171
# Copy jars
172172
cp "$SPARK_HOME"/assembly/target/scala*/jars/* "$DISTDIR/jars/"
173173

174+
# Copy docker files
175+
mkdir -p "$DISTDIR/dockerfiles/driver"
176+
mkdir -p "$DISTDIR/dockerfiles/executor"
177+
DOCKERFILES_SRC="$SPARK_HOME/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker"
178+
cp "$DOCKERFILES_SRC/driver/Dockerfile" "$DISTDIR/dockerfiles/driver/Dockerfile"
179+
cp "$DOCKERFILES_SRC/executor/Dockerfile" "$DISTDIR/dockerfiles/executor/Dockerfile"
180+
174181
# Only create the yarn directory if the yarn artifacts were build.
175182
if [ -f "$SPARK_HOME"/common/network-yarn/target/scala*/spark-*-yarn-shuffle.jar ]; then
176183
mkdir "$DISTDIR"/yarn

dev/scalastyle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ ERRORS=$(echo -e "q\n" \
2626
-Pyarn \
2727
-Phive \
2828
-Phive-thriftserver \
29+
-Pkubernetes \
30+
-Pkubernetes-integration-tests \
2931
scalastyle test:scalastyle \
3032
| awk '{if($1~/error/)print}' \
3133
)

docs/_layouts/global.html

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@
9999
<li><a href="spark-standalone.html">Spark Standalone</a></li>
100100
<li><a href="running-on-mesos.html">Mesos</a></li>
101101
<li><a href="running-on-yarn.html">YARN</a></li>
102+
<li><a href="running-on-kubernetes.html">Kubernetes</a></li>
102103
</ul>
103104
</li>
104105

0 commit comments

Comments
 (0)