diff --git a/assembly/pom.xml b/assembly/pom.xml index ec243eaebaea7..db8ac768a877f 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -148,6 +148,16 @@ + + kubernetes + + + org.apache.spark + spark-kubernetes_${scala.binary.version} + ${project.version} + + + hive diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 85f80b6971e80..3291a1e69fea0 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -69,7 +69,8 @@ object SparkSubmit extends CommandLineUtils { private val STANDALONE = 2 private val MESOS = 4 private val LOCAL = 8 - private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL + private val KUBERNETES = 16 + private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | KUBERNETES | LOCAL // Deploy modes private val CLIENT = 1 @@ -230,8 +231,9 @@ object SparkSubmit extends CommandLineUtils { case m if m.startsWith("spark") => STANDALONE case m if m.startsWith("mesos") => MESOS case m if m.startsWith("local") => LOCAL + case m if m.startsWith("k8s") => KUBERNETES case _ => - printErrorAndExit("Master must either be yarn or start with spark, mesos, local") + printErrorAndExit("Master must either be yarn or start with spark, mesos, k8s, local") -1 } @@ -274,6 +276,7 @@ object SparkSubmit extends CommandLineUtils { } val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER val isMesosCluster = clusterManager == MESOS && deployMode == CLUSTER + val isKubernetesCluster = clusterManager == KUBERNETES && deployMode == CLUSTER // Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files // too for packages that include Python code @@ -596,6 +599,26 @@ object SparkSubmit extends CommandLineUtils { } } + if (isKubernetesCluster) { + childMainClass = "org.apache.spark.deploy.kubernetes.Client" + if (args.isPython) { + childArgs += ("--primary-py-file", args.primaryResource) + childArgs += ("--class", "org.apache.spark.deploy.PythonRunner") + } else if (args.isR) { + val mainFile = new Path(args.primaryResource).getName + childArgs += ("--primary-r-file", mainFile) + childArgs += ("--class", "org.apache.spark.deploy.RRunner") + } else { + if (args.primaryResource != SparkLauncher.NO_RESOURCE) { + childArgs += ("--jar", args.primaryResource) + } + childArgs += ("--class", args.mainClass) + } + if (args.childArgs != null) { + args.childArgs.foreach { arg => childArgs += ("--arg", arg) } + } + } + // Load any properties specified through --conf and the default properties file for ((k, v) <- args.sparkProperties) { sysProps.getOrElseUpdate(k, v) diff --git a/dev/deps/spark-deps-hadoop-2.4 b/dev/deps/spark-deps-hadoop-2.4 index 71e7fb6dd243d..2f95e359d9daf 100644 --- a/dev/deps/spark-deps-hadoop-2.4 +++ b/dev/deps/spark-deps-hadoop-2.4 @@ -98,7 +98,7 @@ jersey-client-2.22.2.jar jersey-common-2.22.2.jar jersey-container-servlet-2.22.2.jar jersey-container-servlet-core-2.22.2.jar -jersey-guava-2.22.2.jar +jersey-guava-2.22.2.jarshaded-proto jersey-media-jaxb-2.22.2.jar jersey-server-2.22.2.jar jets3t-0.9.3.jar diff --git a/dev/make-distribution.sh b/dev/make-distribution.sh index 49b46fbc3fb27..1a92bd43a1a65 100755 --- a/dev/make-distribution.sh +++ b/dev/make-distribution.sh @@ -154,7 +154,9 @@ export MAVEN_OPTS="${MAVEN_OPTS:--Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCac # Store the command as an array because $MVN variable might have spaces in it. # Normal quoting tricks don't work. # See: http://mywiki.wooledge.org/BashFAQ/050 -BUILD_COMMAND=("$MVN" -T 1C clean package -DskipTests $@) +# BUILD_COMMAND=("$MVN" -T 1C clean package -DskipTests $@) + +BUILD_COMMAND=("$MVN" -T 2C package -DskipTests $@) # Actually build the jar echo -e "\nBuilding with..." diff --git a/kubernetes/README.md b/kubernetes/README.md new file mode 100644 index 0000000000000..a93a6cd2c6e27 --- /dev/null +++ b/kubernetes/README.md @@ -0,0 +1,21 @@ +# Pre-requisites +* maven, JDK and all other pre-requisites for building Spark. + +# Steps to compile + +* Clone the fork of spark: https://github.com/foxish/spark/ and switch to the k8s-support branch. +* Build the project + * ./build/mvn -Pkubernetes -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests package +* Ensure that you are pointing to a k8s cluster (kubectl config current-context), which you want to use with spark. +* Launch a spark-submit job: + * `./bin/spark-submit --deploy-mode cluster --class org.apache.spark.examples.SparkPi --master k8s://default --conf spark.executor.instances=5 --conf spark.kubernetes.sparkImage=manyangled/kube-spark:dynamic http://storage.googleapis.com/foxish-spark-distro/original-spark-examples_2.11-2.1.0-SNAPSHOT.jar 10000` + * Optionally, the following config parameters can be supplied to spark-submit with additional `--conf` arguments (or a configuration file). + * spark.kubernetes.serviceAccountName (defaults to "default") + * spark.kubernetes.namespace (defaults to "default"). The namespace must exist prior to launching spark-submit. + * The image is built from https://github.com/erikerlandson/openshift-spark. + * `--master k8s://default` ensures that it picks up the correct APIServer the default from the current context. +* Check for pods being created. Watch the master logs using kubectl log -f . +* If on a cloud/infrastructure provider that allows external load balancers to be provisioned, an external IP will be allocated to the service associated with the driver. The spark-master UI can be accessed from that IP address on port 4040. + + +![spark-submit](spark-submit.png) diff --git a/kubernetes/pom.xml b/kubernetes/pom.xml new file mode 100644 index 0000000000000..7183afc625562 --- /dev/null +++ b/kubernetes/pom.xml @@ -0,0 +1,54 @@ + + + + 4.0.0 + + org.apache.spark + spark-parent_2.11 + 2.1.0-SNAPSHOT + ../pom.xml + + + spark-kubernetes_2.11 + jar + Spark Project Kubernetes + + kubernetes + + + + + org.apache.spark + spark-core_${scala.binary.version} + ${project.version} + + + io.fabric8 + kubernetes-client + 1.4.8 + + + + + + + target/scala-${scala.binary.version}/classes + target/scala-${scala.binary.version}/test-classes + + + \ No newline at end of file diff --git a/kubernetes/spark-submit.png b/kubernetes/spark-submit.png new file mode 100644 index 0000000000000..0776549f4c7d6 Binary files /dev/null and b/kubernetes/spark-submit.png differ diff --git a/kubernetes/src/main/resources/META-INF/MANIFEST.MF b/kubernetes/src/main/resources/META-INF/MANIFEST.MF new file mode 100644 index 0000000000000..1692acd078605 --- /dev/null +++ b/kubernetes/src/main/resources/META-INF/MANIFEST.MF @@ -0,0 +1,3 @@ +Manifest-Version: 1.0 +Main-Class: org.apache.spark.deploy.kubernetes.Client + diff --git a/kubernetes/src/main/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager b/kubernetes/src/main/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager new file mode 100644 index 0000000000000..f477a24d191c3 --- /dev/null +++ b/kubernetes/src/main/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager @@ -0,0 +1 @@ +org.apache.spark.scheduler.cluster.kubernetes.KubernetesClusterManager \ No newline at end of file diff --git a/kubernetes/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/kubernetes/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala new file mode 100644 index 0000000000000..a83a6c98bfa36 --- /dev/null +++ b/kubernetes/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.kubernetes + +import java.util.concurrent.CountDownLatch + +import org.apache.hadoop.conf.Configuration +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler.cluster.kubernetes.KubernetesClusterScheduler +import org.apache.spark.util.ShutdownHookManager + +private[spark] class Client(val args: ClientArguments, + val hadoopConf: Configuration, + val sparkConf: SparkConf) + extends Logging { + private val scheduler = new KubernetesClusterScheduler(sparkConf) + private val shutdownLatch = new CountDownLatch(1) + + def this(clientArgs: ClientArguments, spConf: SparkConf) = + this(clientArgs, SparkHadoopUtil.get.newConfiguration(spConf), spConf) + + def start(): Unit = { + scheduler.start(args) + } + + def stop(): Unit = { + scheduler.stop() + shutdownLatch.countDown() + System.clearProperty("SPARK_KUBERNETES_MODE") + } + + def awaitShutdown(): Unit = { + shutdownLatch.await() + } +} + +private object Client extends Logging { + def main(argStrings: Array[String]) { + val sparkConf = new SparkConf + System.setProperty("SPARK_KUBERNETES_MODE", "true") + val args = new ClientArguments(argStrings) + val client = new Client(args, sparkConf) + client.start() + + logDebug("Adding shutdown hook") + ShutdownHookManager.addShutdownHook { () => + logInfo("Shutdown hook is shutting down client") + client.stop() + client.awaitShutdown() + } + client.awaitShutdown() + } +} diff --git a/kubernetes/src/main/scala/org/apache/spark/deploy/kubernetes/ClientArguments.scala b/kubernetes/src/main/scala/org/apache/spark/deploy/kubernetes/ClientArguments.scala new file mode 100644 index 0000000000000..d4bcd6bf289be --- /dev/null +++ b/kubernetes/src/main/scala/org/apache/spark/deploy/kubernetes/ClientArguments.scala @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.kubernetes + +import scala.collection.mutable.ArrayBuffer + +private[spark] class ClientArguments(args: Array[String]) { + + var userJar: String = null + var userClass: String = null + var primaryPyFile: String = null + var primaryRFile: String = null + var userArgs: ArrayBuffer[String] = new ArrayBuffer[String]() + + parseArgs(args.toList) + + private def parseArgs(inputArgs: List[String]): Unit = { + var args = inputArgs + + while (!args.isEmpty) { + args match { + case ("--jar") :: value :: tail => + userJar = value + args = tail + + case ("--class") :: value :: tail => + userClass = value + args = tail + + case ("--primary-py-file") :: value :: tail => + primaryPyFile = value + args = tail + + case ("--primary-r-file") :: value :: tail => + primaryRFile = value + args = tail + + case ("--arg") :: value :: tail => + userArgs += value + args = tail + + case Nil => + + case _ => + throw new IllegalArgumentException(getUsageMessage(args)) + } + } + + if (primaryPyFile != null && primaryRFile != null) { + throw new IllegalArgumentException("Cannot have primary-py-file and primary-r-file" + + " at the same time") + } + } + + private def getUsageMessage(unknownParam: List[String] = null): String = { + val message = if (unknownParam != null) s"Unknown/unsupported param $unknownParam\n" else "" + message + + s""" + |Usage: org.apache.spark.deploy.kubernetes.Client [options] + |Options: + | --jar JAR_PATH Path to your application's JAR file (required in kubernetes-cluster + | mode) + | --class CLASS_NAME Name of your application's main class (required) + | --primary-py-file A main Python file + | --primary-r-file A main R file + | --arg ARG Argument to be passed to your application's main class. + | Multiple invocations are possible, each will be passed in order. + """.stripMargin + } +} diff --git a/kubernetes/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala b/kubernetes/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala new file mode 100644 index 0000000000000..a163b5ef8d749 --- /dev/null +++ b/kubernetes/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.kubernetes + +import org.apache.spark.{SparkContext, SparkException} +import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl} + +/** + * Cluster Manager for creation of Kubernetes scheduler and backend + */ +private[spark] class KubernetesClusterManager extends ExternalClusterManager { + override def canCreate(masterURL: String): Boolean = { + masterURL.startsWith("k8s") + } + + override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = { + new TaskSchedulerImpl(sc) + } + + override def createSchedulerBackend(sc: SparkContext, + masterURL: String, + scheduler: TaskScheduler): SchedulerBackend = { + new KubernetesClusterSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc) + } + + override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = { + scheduler.asInstanceOf[TaskSchedulerImpl].initialize(backend) + } +} + diff --git a/kubernetes/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterScheduler.scala b/kubernetes/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterScheduler.scala new file mode 100644 index 0000000000000..a9d53c386d0dc --- /dev/null +++ b/kubernetes/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterScheduler.scala @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.kubernetes + +import java.io.File +import java.util.Date +import java.util.concurrent.atomic.AtomicLong + +import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient, KubernetesClient} +import io.fabric8.kubernetes.api.model.{PodBuilder, ServiceBuilder} +import io.fabric8.kubernetes.client.dsl.LogWatch +import org.apache.spark.deploy.Command +import org.apache.spark.deploy.kubernetes.ClientArguments +import org.apache.spark.{io, _} +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ + +import collection.JavaConverters._ +import org.apache.spark.util.Utils + +import scala.util.Random + +private[spark] object KubernetesClusterScheduler { + def defaultNameSpace = "default" + def defaultServiceAccountName = "default" +} + +/** + * This is a simple extension to ClusterScheduler + * */ +private[spark] class KubernetesClusterScheduler(conf: SparkConf) + extends Logging { + private val DEFAULT_SUPERVISE = false + private val DEFAULT_MEMORY = Utils.DEFAULT_DRIVER_MEM_MB // mb + private val DEFAULT_CORES = 1.0 + + logInfo("Created KubernetesClusterScheduler instance") + + var client = setupKubernetesClient() + val driverName = s"spark-driver-${Random.alphanumeric take 5 mkString("")}".toLowerCase() + val svcName = s"spark-svc-${Random.alphanumeric take 5 mkString("")}".toLowerCase() + val nameSpace = conf.get( + "spark.kubernetes.namespace", + KubernetesClusterScheduler.defaultNameSpace) + val serviceAccountName = conf.get( + "spark.kubernetes.serviceAccountName", + KubernetesClusterScheduler.defaultServiceAccountName) + + // Anything that should either not be passed to driver config in the cluster, or + // that is going to be explicitly managed as command argument to the driver pod + val confBlackList = scala.collection.Set( + "spark.master", + "spark.app.name", + "spark.submit.deployMode", + "spark.executor.jar", + "spark.dynamicAllocation.enabled", + "spark.shuffle.service.enabled") + + def start(args: ClientArguments): Unit = { + startDriver(client, args) + } + + def stop(): Unit = { + client.pods().inNamespace(nameSpace).withName(driverName).delete() + client + .services() + .inNamespace(nameSpace) + .withName(svcName) + .delete() + } + + def startDriver(client: KubernetesClient, + args: ClientArguments): Unit = { + logInfo("Starting spark driver on kubernetes cluster") + val driverDescription = buildDriverDescription(args) + + // image needs to support shim scripts "/opt/driver.sh" and "/opt/executor.sh" + val sparkImage = conf.getOption("spark.kubernetes.sparkImage").getOrElse { + // TODO: this needs to default to some standard Apache Spark image + throw new SparkException("Spark image not set. Please configure spark.kubernetes.sparkImage") + } + + // This is the URL of the client jar. + val clientJarUri = args.userJar + + // This is the kubernetes master we're launching on. + val kubernetesHost = "k8s://" + client.getMasterUrl().getHost() + logInfo("Using as kubernetes-master: " + kubernetesHost.toString()) + + val submitArgs = scala.collection.mutable.ArrayBuffer.empty[String] + submitArgs ++= Vector( + clientJarUri, + s"--class=${args.userClass}", + s"--master=$kubernetesHost", + s"--executor-memory=${driverDescription.mem}", + s"--conf spark.executor.jar=$clientJarUri") + + submitArgs ++= conf.getAll.filter { case (name, _) => !confBlackList.contains(name) } + .map { case (name, value) => s"--conf ${name}=${value}" } + + if (conf.getBoolean("spark.dynamicAllocation.enabled", false)) { + submitArgs ++= Vector( + "--conf spark.dynamicAllocation.enabled=true", + "--conf spark.shuffle.service.enabled=true") + } + + // these have to come at end of arg list + submitArgs ++= Vector("/opt/spark/kubernetes/client.jar", + args.userArgs.mkString(" ")) + + val labelMap = Map("type" -> "spark-driver") + val pod = new PodBuilder() + .withNewMetadata() + .withLabels(labelMap.asJava) + .withName(driverName) + .endMetadata() + .withNewSpec() + .withRestartPolicy("OnFailure") + .withServiceAccount(serviceAccountName) + .addNewContainer() + .withName("spark-driver") + .withImage(sparkImage) + .withImagePullPolicy("Always") + .withCommand(s"/opt/driver.sh") + .withArgs(submitArgs :_*) + .endContainer() + .endSpec() + .build() + client.pods().inNamespace(nameSpace).withName(driverName).create(pod) + + var svc = new ServiceBuilder() + .withNewMetadata() + .withLabels(labelMap.asJava) + .withName(svcName) + .endMetadata() + .withNewSpec() + .addNewPort() + .withPort(4040) + .withNewTargetPort() + .withIntVal(4040) + .endTargetPort() + .endPort() + .withSelector(labelMap.asJava) + .withType("LoadBalancer") + .endSpec() + .build() + + client + .services() + .inNamespace(nameSpace) + .withName(svcName) + .create(svc) + +// try { +// while (true) { +// client +// .pods() +// .inNamespace("default") +// .withName("spark-driver") +// .tailingLines(10) +// .watchLog(System.out) +// Thread.sleep(5 * 1000) +// } +// } catch { +// case e: Exception => logError(e.getMessage) +// } + } + + def setupKubernetesClient(): KubernetesClient = { + val sparkHost = new java.net.URI(conf.get("spark.master")).getHost() + + var config = new ConfigBuilder().withNamespace(nameSpace) + if (sparkHost != "default") { + config = config.withMasterUrl(sparkHost) + } + + // TODO: support k8s user and password options: + // .withTrustCerts(true) + // .withUsername("admin") + // .withPassword("admin") + + new DefaultKubernetesClient(config.build()) + } + + private def buildDriverDescription(args: ClientArguments): KubernetesDriverDescription = { + // Required fields, including the main class because python is not yet supported + val appResource = Option(args.userJar).getOrElse { + throw new SparkException("Application jar is missing.") + } + val mainClass = Option(args.userClass).getOrElse { + throw new SparkException("Main class is missing.") + } + + // Optional fields + val driverExtraJavaOptions = conf.getOption("spark.driver.extraJavaOptions") + val driverExtraClassPath = conf.getOption("spark.driver.extraClassPath") + val driverExtraLibraryPath = conf.getOption("spark.driver.extraLibraryPath") + val superviseDriver = conf.getOption("spark.driver.supervise") + val driverMemory = conf.getOption("spark.driver.memory") + val driverCores = conf.getOption("spark.driver.cores") + val name = conf.getOption("spark.app.name").getOrElse("default") + val appArgs = args.userArgs + + // Construct driver description + val extraClassPath = driverExtraClassPath.toSeq.flatMap(_.split(File.pathSeparator)) + val extraLibraryPath = driverExtraLibraryPath.toSeq.flatMap(_.split(File.pathSeparator)) + val extraJavaOpts = driverExtraJavaOptions.map(Utils.splitCommandString).getOrElse(Seq.empty) + val sparkJavaOpts = Utils.sparkJavaOpts(conf) + val javaOpts = sparkJavaOpts ++ extraJavaOpts + val command = new Command( + mainClass, appArgs, null, extraClassPath, extraLibraryPath, javaOpts) + val actualSuperviseDriver = superviseDriver.map(_.toBoolean).getOrElse(DEFAULT_SUPERVISE) + val actualDriverMemory = driverMemory.map(Utils.memoryStringToMb).getOrElse(DEFAULT_MEMORY) + val actualDriverCores = driverCores.map(_.toDouble).getOrElse(DEFAULT_CORES) + val submitDate = new Date() + + new KubernetesDriverDescription( + name, appResource, actualDriverMemory, actualDriverCores, actualSuperviseDriver, + command, submitDate) + } +} diff --git a/kubernetes/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/kubernetes/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala new file mode 100644 index 0000000000000..bf83189e5e2f6 --- /dev/null +++ b/kubernetes/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.kubernetes + +import collection.JavaConverters._ +import io.fabric8.kubernetes.api.model.PodBuilder +import io.fabric8.kubernetes.api.model.extensions.JobBuilder +import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient} +import org.apache.spark.internal.config._ +import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster._ +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ +import org.apache.spark.{SparkConf, SparkContext, SparkException} +import org.apache.spark.rpc.RpcEndpointAddress +import org.apache.spark.scheduler.TaskSchedulerImpl +import org.apache.spark.util.Utils + +import scala.collection.mutable +import scala.util.Random +import scala.concurrent.Future + +private[spark] class KubernetesClusterSchedulerBackend( + scheduler: TaskSchedulerImpl, + sc: SparkContext) + extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) { + + val client = new DefaultKubernetesClient() + + val DEFAULT_NUMBER_EXECUTORS = 2 + val sparkExecutorName = s"spark-executor-${Random.alphanumeric take 5 mkString("")}".toLowerCase() + + // TODO: do these need mutex guarding? + // key is executor id, value is pod name + var executorToPod = mutable.Map.empty[String, String] // active executors + var shutdownToPod = mutable.Map.empty[String, String] // pending shutdown + var executorID = 0 + + val sparkImage = conf.get("spark.kubernetes.sparkImage") + val clientJarUri = conf.get("spark.executor.jar") + val ns = conf.get( + "spark.kubernetes.namespace", + KubernetesClusterScheduler.defaultNameSpace) + val dynamicExecutors = Utils.isDynamicAllocationEnabled(conf) + + // executor back-ends take their configuration this way + if (dynamicExecutors) { + conf.setExecutorEnv("spark.dynamicAllocation.enabled", "true") + conf.setExecutorEnv("spark.shuffle.service.enabled", "true") + } + + override def start(): Unit = { + super.start() + createExecutorPods(getInitialTargetExecutorNumber(sc.getConf)) + } + + override def stop(): Unit = { + // Kill all executor pods indiscriminately + killExecutorPods(executorToPod.toVector) + killExecutorPods(shutdownToPod.toVector) + super.stop() + } + + // Dynamic allocation interfaces + override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = { + logInfo(s"Received doRequestTotalExecutors: $requestedTotal") + val n = executorToPod.size + val delta = requestedTotal - n + if (delta > 0) { + logInfo(s"Adding $delta new executors") + createExecutorPods(delta) + } else if (delta < 0) { + val d = -delta + val idle = executorToPod.toVector.filter { case (id, _) => !scheduler.isExecutorBusy(id) } + if (idle.length > 0) { + logInfo(s"Shutting down ${idle.length} idle executors") + shutdownExecutors(idle.take(d)) + } + val r = math.max(0, d - idle.length) + if (r > 0) { + logInfo(s"Shutting down $r non-idle executors") + shutdownExecutors(executorToPod.toVector.slice(n - r, n)) + } + } + // TODO: are there meaningful failure modes here? + Future.successful(true) + } + + override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = { + logInfo(s"Received doKillExecutors") + killExecutorPods(executorIds.map { id => (id, executorToPod(id)) }) + Future.successful(true) + } + + private def createExecutorPods(n: Int) { + for (i <- 1 to n) { + executorID += 1 + executorToPod += ((executorID.toString, createExecutorPod(executorID))) + } + } + + def shutdownExecutors(idPodPairs: Seq[(String, String)]) { + val active = getExecutorIds.toSet + + // Check for any finished shutting down and kill the pods + val shutdown = shutdownToPod.toVector.filter { case (e, _) => !active.contains(e) } + killExecutorPods(shutdown) + + // Now request shutdown for the new ones. + // Move them from executor list to list pending shutdown + for ((id, pod) <- idPodPairs) { + try { + // TODO: 'ask' returns a future - can it be used to check eventual success? + Option(driverEndpoint).foreach(_.ask[Boolean](RemoveExecutor(id, ExecutorKilled))) + executorToPod -= id + shutdownToPod += ((id, pod)) + } catch { + case e: Exception => logError(s"Error shutting down executor $id", e) + } + } + } + + private def killExecutorPods(idPodPairs: Seq[(String, String)]) { + for ((id, pod) <- idPodPairs) { + try { + client.pods().inNamespace(ns).withName(pod).delete() + executorToPod -= id + shutdownToPod -= id + } catch { + case e: Exception => logError(s"Error killing executor pod $pod", e) + } + } + } + + def getInitialTargetExecutorNumber(conf: SparkConf, + numExecutors: Int = + DEFAULT_NUMBER_EXECUTORS): Int = { + if (dynamicExecutors) { + val minNumExecutors = conf.get(DYN_ALLOCATION_MIN_EXECUTORS) + val initialNumExecutors = + Utils.getDynamicAllocationInitialExecutors(conf) + val maxNumExecutors = conf.get(DYN_ALLOCATION_MAX_EXECUTORS) + require( + initialNumExecutors >= minNumExecutors && initialNumExecutors <= maxNumExecutors, + s"initial executor number $initialNumExecutors must between min executor number " + + s"$minNumExecutors and max executor number $maxNumExecutors") + + initialNumExecutors + } else { + conf.get(EXECUTOR_INSTANCES).getOrElse(numExecutors) + } + } + + def createExecutorPod(executorNum: Int): String = { + // create a single k8s executor pod. + val labelMap = Map("type" -> "spark-executor") + val podName = s"$sparkExecutorName-$executorNum" + + val submitArgs = mutable.ArrayBuffer.empty[String] + + if (conf.getBoolean("spark.dynamicAllocation.enabled", false)) { + submitArgs ++= Vector( + "dynamic-executors") + } + + submitArgs ++= Vector("org.apache.spark.executor.CoarseGrainedExecutorBackend", + "--driver-url", s"$driverURL", + "--executor-id", s"$executorNum", + "--hostname", "localhost", + "--app-id", "1", // TODO: change app-id per application and pass from driver. + "--cores", "1") + + var pod = new PodBuilder() + .withNewMetadata() + .withLabels(labelMap.asJava) + .withName(podName) + .endMetadata() + .withNewSpec() + .withRestartPolicy("OnFailure") + + .addNewContainer().withName("spark-executor").withImage(sparkImage) + .withImagePullPolicy("IfNotPresent") + .withCommand("/opt/executor.sh") + .withArgs(submitArgs :_*) + .endContainer() + + .endSpec().build() + client.pods().inNamespace(ns).withName(podName).create(pod) + + podName + } + + protected def driverURL: String = { + if (conf.contains("spark.testing")) { + "driverURL" + } else { + RpcEndpointAddress( + conf.get("spark.driver.host"), + conf.get("spark.driver.port").toInt, + CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString + } + } + + override def getDriverLogUrls: Option[Map[String, String]] = { + var driverLogs: Option[Map[String, String]] = None + driverLogs + } +} diff --git a/kubernetes/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesDriverDescription.scala b/kubernetes/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesDriverDescription.scala new file mode 100644 index 0000000000000..00ac7ea4da08a --- /dev/null +++ b/kubernetes/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesDriverDescription.scala @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.kubernetes + +import java.util.Date + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.Command + +private[spark] class KubernetesDriverDescription( + val name: String, + val jarUrl: String, + val mem: Int, + val cores: Double, + val supervise: Boolean, + val command: Command, + val submissionDate: Date) + extends Serializable { + val conf = new SparkConf(false) + + +} diff --git a/pom.xml b/pom.xml index c391102d37502..908a56e0d8765 100644 --- a/pom.xml +++ b/pom.xml @@ -2592,6 +2592,13 @@ + + kubernetes + + kubernetes + + + hive-thriftserver diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index e3fbe0379fb7b..c287695bbed1d 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -56,9 +56,9 @@ object BuildCommons { "tags", "sketch" ).map(ProjectRef(buildLocation, _)) ++ sqlProjects ++ streamingProjects - val optionallyEnabledProjects@Seq(mesos, yarn, java8Tests, sparkGangliaLgpl, + val optionallyEnabledProjects@Seq(mesos, yarn, kubernetes, java8Tests, sparkGangliaLgpl, streamingKinesisAsl, dockerIntegrationTests) = - Seq("mesos", "yarn", "java8-tests", "ganglia-lgpl", "streaming-kinesis-asl", + Seq("mesos", "yarn", "kubernetes", "java8-tests", "ganglia-lgpl", "streaming-kinesis-asl", "docker-integration-tests").map(ProjectRef(buildLocation, _)) val assemblyProjects@Seq(networkYarn, streamingFlumeAssembly, streamingKafkaAssembly, streamingKafka010Assembly, streamingKinesisAslAssembly) = diff --git a/sbin/driver.yaml b/sbin/driver.yaml new file mode 100644 index 0000000000000..de806599c2c5c --- /dev/null +++ b/sbin/driver.yaml @@ -0,0 +1,7 @@ +apiVersion: "kubernetes.io/v1" +kind: SparkJob +metadata: + name: spark-driver-1924 +image: "driver-image" +state: "completed" +num-executors: 10 \ No newline at end of file diff --git a/sbin/kubernetes-resource.yaml b/sbin/kubernetes-resource.yaml new file mode 100644 index 0000000000000..58d2072c0622b --- /dev/null +++ b/sbin/kubernetes-resource.yaml @@ -0,0 +1,10 @@ +metadata: + name: spark-job.kubernetes.io + labels: + resource: spark-job + object: spark +apiVersion: extensions/v1beta1 +kind: ThirdPartyResource +description: "A resource that manages a spark job" +versions: + - name: v1 \ No newline at end of file