diff --git a/assembly/pom.xml b/assembly/pom.xml index ec243eaebaea7..db8ac768a877f 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -148,6 +148,16 @@ + + kubernetes + + + org.apache.spark + spark-kubernetes_${scala.binary.version} + ${project.version} + + + hive diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/kubernetes/KubernetesExternalShuffleClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/kubernetes/KubernetesExternalShuffleClient.java new file mode 100644 index 0000000000000..d473c5889d8c4 --- /dev/null +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/kubernetes/KubernetesExternalShuffleClient.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.network.shuffle.kubernetes; + +import org.apache.spark.network.client.TransportClient; +import org.apache.spark.network.sasl.SecretKeyHolder; +import org.apache.spark.network.shuffle.ExternalShuffleClient; +import org.apache.spark.network.shuffle.protocol.kubernetes.ApplicationComplete; +import org.apache.spark.network.util.TransportConf; + +import java.io.IOException; +import java.nio.ByteBuffer; + +public class KubernetesExternalShuffleClient extends ExternalShuffleClient { + + public KubernetesExternalShuffleClient( + TransportConf conf, + SecretKeyHolder secretKeyHolder, + boolean saslEnabled, + boolean saslEncryptionEnabled) { + super(conf, secretKeyHolder, saslEnabled, saslEncryptionEnabled); + } + + public void sendApplicationComplete(String host, int port) throws IOException { + checkInit(); + ByteBuffer applicationComplete = new ApplicationComplete(appId).toByteBuffer(); + TransportClient client = clientFactory.createClient(host, port); + try { + client.send(applicationComplete); + } finally { + if (client != null) { + client.close(); + } + } + } +} diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java index 9af6759f5d5f3..e908e1b1bda08 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java @@ -23,6 +23,7 @@ import io.netty.buffer.Unpooled; import org.apache.spark.network.protocol.Encodable; +import org.apache.spark.network.shuffle.protocol.kubernetes.ApplicationComplete; import org.apache.spark.network.shuffle.protocol.mesos.RegisterDriver; import org.apache.spark.network.shuffle.protocol.mesos.ShuffleServiceHeartbeat; @@ -42,7 +43,7 @@ public abstract class BlockTransferMessage implements Encodable { /** Preceding every serialized message is its type, which allows us to deserialize it. */ public enum Type { OPEN_BLOCKS(0), UPLOAD_BLOCK(1), REGISTER_EXECUTOR(2), STREAM_HANDLE(3), REGISTER_DRIVER(4), - HEARTBEAT(5); + HEARTBEAT(5), APPLICATION_COMPLETE(6); private final byte id; @@ -67,6 +68,7 @@ public static BlockTransferMessage fromByteBuffer(ByteBuffer msg) { case 3: return StreamHandle.decode(buf); case 4: return RegisterDriver.decode(buf); case 5: return ShuffleServiceHeartbeat.decode(buf); + case 6: return ApplicationComplete.decode(buf); default: throw new IllegalArgumentException("Unknown message type: " + type); } } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/kubernetes/ApplicationComplete.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/kubernetes/ApplicationComplete.java new file mode 100644 index 0000000000000..74b9adfaac58d --- /dev/null +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/kubernetes/ApplicationComplete.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.network.shuffle.protocol.kubernetes; + +import io.netty.buffer.ByteBuf; +import org.apache.spark.network.protocol.Encoders; +import org.apache.spark.network.shuffle.protocol.BlockTransferMessage; + +import java.util.Objects; + +public class ApplicationComplete extends BlockTransferMessage { + + private final String appId; + + public ApplicationComplete(String appId) { + this.appId = appId; + } + + @Override + protected Type type() { + return Type.APPLICATION_COMPLETE; + } + + @Override + public int encodedLength() { + return Encoders.Strings.encodedLength(appId); + } + + @Override + public void encode(ByteBuf buf) { + Encoders.Strings.encode(buf, appId); + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof ApplicationComplete)) { + return false; + } + return Objects.equals(appId, ((ApplicationComplete) other).appId); + } + + @Override + public int hashCode() { + return Objects.hashCode(appId); + } + + public String getAppId() { + return appId; + } + + public static ApplicationComplete decode(ByteBuf buf) { + String appId = Encoders.Strings.decode(buf); + return new ApplicationComplete(appId); + } +} diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 5c052286099f5..37e363fd88b78 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -70,7 +70,8 @@ object SparkSubmit { private val STANDALONE = 2 private val MESOS = 4 private val LOCAL = 8 - private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL + private val KUBERNETES = 16 + private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL | KUBERNETES // Deploy modes private val CLIENT = 1 @@ -237,6 +238,7 @@ object SparkSubmit { printWarning(s"Master ${args.master} is deprecated since 2.0." + " Please use master \"yarn\" with specified deploy mode instead.") YARN + case m if m.startsWith("kubernetes") => KUBERNETES case m if m.startsWith("spark") => STANDALONE case m if m.startsWith("mesos") => MESOS case m if m.startsWith("local") => LOCAL @@ -276,6 +278,14 @@ object SparkSubmit { } } + if (clusterManager == KUBERNETES + && !Utils.classIsLoadable("org.apache.spark.deploy.kubernetes.Client") + && !Utils.isTesting) { + printErrorAndExit( + "Could not load Kubernetes classes. " + + "This copy of Spark may not have been compiled with Kubernetes support.") + } + // Update args.deployMode if it is null. It will be passed down as a Spark property later. (args.deployMode, deployMode) match { case (null, CLIENT) => args.deployMode = "client" @@ -347,6 +357,12 @@ object SparkSubmit { printErrorAndExit("Cluster deploy mode is not applicable to Spark SQL shell.") case (_, CLUSTER) if isThriftServer(args.mainClass) => printErrorAndExit("Cluster deploy mode is not applicable to Spark Thrift server.") + case (KUBERNETES, CLIENT) => + printErrorAndExit("Client mode is currently not supported for Kubernetes.") + case (KUBERNETES, CLUSTER) if args.isPython => + printErrorAndExit("Python is currently not supported for Kubernetes.") + case (KUBERNETES, CLUSTER) if args.isR => + printErrorAndExit("R is currently not supported for Kubernetes.") case _ => } @@ -466,6 +482,26 @@ object SparkSubmit { OptionAssigner(args.principal, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.principal"), OptionAssigner(args.keytab, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.keytab"), + // Kubernetes only + OptionAssigner(args.kubernetesAppName, KUBERNETES, CLUSTER, + sysProp = "spark.kubernetes.app.name"), + OptionAssigner(args.kubernetesAppNamespace, KUBERNETES, CLUSTER, + sysProp = "spark.kubernetes.namespace"), + OptionAssigner(args.kubernetesCaCertFile, KUBERNETES, CLUSTER, + sysProp = "spark.kubernetes.ca.cert.file"), + OptionAssigner(args.kubernetesClientCertFile, KUBERNETES, CLUSTER, + sysProp = "spark.kubernetes.client.cert.file"), + OptionAssigner(args.kubernetesClientKeyFile, KUBERNETES, CLUSTER, + sysProp = "spark.kubernetes.client.key.file"), + OptionAssigner(args.kubernetesMaster, KUBERNETES, CLUSTER, + sysProp = "spark.kubernetes.master"), + OptionAssigner(args.customExecutorSpecFile, KUBERNETES, CLUSTER, + sysProp = "spark.kubernetes.executor.custom.spec.file"), + OptionAssigner(args.customExecutorSpecContainerName, KUBERNETES, CLUSTER, + sysProp = "spark.kubernetes.executor.custom.spec.container.name"), + OptionAssigner(args.executorDockerImage, KUBERNETES, CLUSTER, + sysProp = "spark.kubernetes.executor.docker.image"), + // Other options OptionAssigner(args.executorCores, STANDALONE | YARN, ALL_DEPLOY_MODES, sysProp = "spark.executor.cores"), @@ -486,6 +522,16 @@ object SparkSubmit { OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, sysProp = "spark.jars.ivy") ) + if (args.isKubernetesCluster) { + childMainClass = "org.apache.spark.deploy.kubernetes.Client" + for ((portName, portValue) <- args.exposeDriverPorts) { + childArgs += ("--expose-driver-port", s"$portName=$portValue") + } + args.childArgs.foreach(arg => childArgs += ("--arg", arg)) + childArgs += ("--class", args.mainClass) + childArgs += ("--driver-docker-image", args.driverDockerImage) + } + // In client mode, launch the application main class directly // In addition, add the main application jar and any added jars (if any) to the classpath if (deployMode == CLIENT) { diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index f1761e7c1ec92..0ef670ab67ee8 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -78,6 +78,19 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S var submissionToRequestStatusFor: String = null var useRest: Boolean = true // used internally + // Kubernetes only + var kubernetesAppName: String = null + val exposeDriverPorts: HashMap[String, Int] = new HashMap[String, Int]() + var driverDockerImage: String = null + var executorDockerImage: String = null + var customExecutorSpecFile: String = null + var customExecutorSpecContainerName: String = null + var kubernetesMaster: String = null + var kubernetesAppNamespace: String = null + var kubernetesClientCertFile: String = null + var kubernetesClientKeyFile: String = null + var kubernetesCaCertFile: String = null + /** Default properties present in the currently defined defaults file. */ lazy val defaultSparkProperties: HashMap[String, String] = { val defaultProperties = new HashMap[String, String]() @@ -287,6 +300,10 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S master.startsWith("spark://") && deployMode == "cluster" } + def isKubernetesCluster: Boolean = { + master.startsWith("kubernetes") && deployMode == "cluster" + } + override def toString: String = { s"""Parsed arguments: | master $master @@ -438,6 +455,43 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S case USAGE_ERROR => printUsageAndExit(1) + // Kubernetes only + case KUBERNETES_APP_NAME => + kubernetesAppName = value + + case KUBERNETES_APP_NAMESPACE => + kubernetesAppNamespace = value + + case KUBERNETES_CA_CERT_FILE => + kubernetesCaCertFile = value + + case KUBERNETES_CLIENT_CERT_FILE => + kubernetesClientCertFile = value + + case KUBERNETES_CLIENT_KEY_FILE => + kubernetesClientKeyFile = value + + case KUBERNETES_CUSTOM_EXECUTOR_SPEC_CONTAINER_NAME => + customExecutorSpecFile = value + + case KUBERNETES_CUSTOM_EXECUTOR_SPEC_FILE => + customExecutorSpecContainerName = value + + case KUBERNETES_DRIVER_DOCKER_IMAGE => + driverDockerImage = value + + case KUBERNETES_EXECUTOR_DOCKER_IMAGE => + executorDockerImage = value + + case KUBERNETES_EXPOSE_DRIVER_PORT => + value.split("=", 2).toSeq match { + case Seq(k, v) => exposeDriverPorts(k) = v.toInt + case _ => SparkSubmit.printErrorAndExit(s"Driver port specified without '=': $value") + } + + case KUBERNETES_MASTER => + kubernetesMaster = value + case _ => throw new IllegalArgumentException(s"Unexpected argument '$opt'.") } diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 7eec4ae64f296..8b8f13b8f0ab6 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -200,7 +200,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { new SecurityManager(executorConf), clientMode = true) val driver = fetcher.setupEndpointRefByURI(driverUrl) - val props = driver.askWithRetry[Seq[(String, String)]](RetrieveSparkProps) ++ + val props = driver.askWithRetry[Seq[(String, String)]](RetrieveSparkProps(executorId)) ++ Seq[(String, String)](("spark.app.id", appId)) fetcher.shutdown() diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index edc8aac5d1515..52cdad96bfa80 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -28,10 +28,10 @@ private[spark] sealed trait CoarseGrainedClusterMessage extends Serializable private[spark] object CoarseGrainedClusterMessages { - case object RetrieveSparkProps extends CoarseGrainedClusterMessage - case object RetrieveLastAllocatedExecutorId extends CoarseGrainedClusterMessage + case class RetrieveSparkProps(executorId: String) extends CoarseGrainedClusterMessage + // Driver to executors case class LaunchTask(data: SerializableBuffer) extends CoarseGrainedClusterMessage diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 10d55c87fb8de..3e649cdb95054 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -89,9 +89,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp @GuardedBy("CoarseGrainedSchedulerBackend.this") protected var localityAwareTasks = 0 - // The num of current max ExecutorId used to re-register appMaster - @volatile protected var currentExecutorIdCounter = 0 - class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)]) extends ThreadSafeRpcEndpoint with Logging { @@ -157,10 +154,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // If the executor's rpc env is not listening for incoming connections, `hostPort` // will be null, and the client connection should be used to contact the executor. val executorAddress = if (executorRef.address != null) { - executorRef.address - } else { - context.senderAddress - } + executorRef.address + } else { + context.senderAddress + } logInfo(s"Registered executor $executorRef ($executorAddress) with ID $executorId") addressToExecutorId(executorAddress) = executorId totalCoreCount.addAndGet(cores) @@ -171,9 +168,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // in this block are read when requesting executors CoarseGrainedSchedulerBackend.this.synchronized { executorDataMap.put(executorId, data) - if (currentExecutorIdCounter < executorId.toInt) { - currentExecutorIdCounter = executorId.toInt - } + registerExecutorId(executorId) if (numPendingExecutors > 0) { numPendingExecutors -= 1 logDebug(s"Decremented number of pending executors ($numPendingExecutors left)") @@ -206,7 +201,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp removeExecutor(executorId, reason) context.reply(true) - case RetrieveSparkProps => + case RetrieveSparkProps(executorHostname) => context.reply(sparkProperties) } @@ -444,6 +439,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp */ private def numExistingExecutors: Int = executorDataMap.size + // Only overridden by YARN to handle executor ID counting. + protected def registerExecutorId(executorId: String): Unit = {} + override def getExecutorIds(): Seq[String] = { executorDataMap.keySet.toSeq } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 982b83324e0fc..c774400505573 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -185,7 +185,12 @@ private[spark] class BlockManager( shuffleServerId = if (externalShuffleServiceEnabled) { logInfo(s"external shuffle service port = $externalShuffleServicePort") - BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort) + val shuffleServerHostName = if (blockManagerId.isDriver) { + blockTransferService.hostName + } else { + conf.get("spark.shuffle.service.host", blockTransferService.hostName) + } + BlockManagerId(executorId, shuffleServerHostName, externalShuffleServicePort) } else { blockManagerId } diff --git a/dev/scalastyle b/dev/scalastyle index f3dec833636c6..de7423913fad9 100755 --- a/dev/scalastyle +++ b/dev/scalastyle @@ -26,6 +26,8 @@ ERRORS=$(echo -e "q\n" \ -Pyarn \ -Phive \ -Phive-thriftserver \ + -Pkubernetes \ + -Pkubernetes-integration-tests \ scalastyle test:scalastyle \ | awk '{if($1~/error/)print}' \ ) diff --git a/kubernetes/core/pom.xml b/kubernetes/core/pom.xml new file mode 100644 index 0000000000000..4f5a94e2a6630 --- /dev/null +++ b/kubernetes/core/pom.xml @@ -0,0 +1,129 @@ + + + + 4.0.0 + + org.apache.spark + spark-parent_2.11 + 2.1.0-SNAPSHOT + ../../pom.xml + + + spark-kubernetes_2.11 + jar + Spark Project Kubernetes + + kubernetes + 1.4.17 + + + + + org.apache.spark + spark-core_${scala.binary.version} + ${project.version} + + + + org.apache.spark + spark-core_${scala.binary.version} + ${project.version} + test-jar + test + + + + com.fasterxml.jackson.jaxrs + jackson-jaxrs-json-provider + + + + io.fabric8 + kubernetes-client + ${kubernetes.client.version} + + + + com.netflix.feign + feign-core + + + com.netflix.feign + feign-okhttp + + + com.netflix.feign + feign-jackson + + + com.netflix.feign + feign-jaxrs + + + javax.ws.rs + jsr311-api + + + + + javax.ws.rs + javax.ws.rs-api + + + + + com.google.guava + guava + + + + org.eclipse.jetty + jetty-server + + + org.eclipse.jetty + jetty-plus + + + org.eclipse.jetty + jetty-util + + + org.eclipse.jetty + jetty-http + + + org.eclipse.jetty + jetty-servlet + + + org.eclipse.jetty + jetty-servlets + + + + + + + + + target/scala-${scala.binary.version}/classes + target/scala-${scala.binary.version}/test-classes + + + diff --git a/kubernetes/core/src/main/java/org/apache/spark/deploy/kubernetes/shuffle/KubernetesShuffleBlockHandler.java b/kubernetes/core/src/main/java/org/apache/spark/deploy/kubernetes/shuffle/KubernetesShuffleBlockHandler.java new file mode 100644 index 0000000000000..8267fc5a3ae13 --- /dev/null +++ b/kubernetes/core/src/main/java/org/apache/spark/deploy/kubernetes/shuffle/KubernetesShuffleBlockHandler.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes.shuffle; + +import org.apache.spark.network.client.RpcResponseCallback; +import org.apache.spark.network.client.TransportClient; +import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler; +import org.apache.spark.network.shuffle.protocol.BlockTransferMessage; +import org.apache.spark.network.shuffle.protocol.kubernetes.ApplicationComplete; +import org.apache.spark.network.util.TransportConf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; + +public class KubernetesShuffleBlockHandler extends ExternalShuffleBlockHandler { + private static final Logger logger = LoggerFactory.getLogger(KubernetesShuffleBlockHandler.class); + + private final KubernetesShuffleServiceSecurityManager securityManager; + + public KubernetesShuffleBlockHandler( + TransportConf conf, + File registeredExecutorFile, + KubernetesShuffleServiceSecurityManager securityManager) throws IOException { + super(conf, registeredExecutorFile); + this.securityManager = securityManager; + } + + @Override + protected void handleMessage( + BlockTransferMessage msgObj, + TransportClient client, + RpcResponseCallback callback) { + if (msgObj instanceof ApplicationComplete) { + String appId = ((ApplicationComplete) msgObj).getAppId(); + logger.info("Received application complete message for app: " + appId); + securityManager.applicationComplete(appId); + } else { + super.handleMessage(msgObj, client, callback); + } + } +} diff --git a/kubernetes/core/src/main/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager b/kubernetes/core/src/main/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager new file mode 100644 index 0000000000000..55e7e38b28a08 --- /dev/null +++ b/kubernetes/core/src/main/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager @@ -0,0 +1 @@ +org.apache.spark.scheduler.cluster.kubernetes.KubernetesClusterManager diff --git a/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala new file mode 100644 index 0000000000000..1e63e897e9a26 --- /dev/null +++ b/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes + +import java.io.{File, FileInputStream} +import java.nio.file.{Files, Paths} +import java.security.SecureRandom +import java.util.concurrent.{Executors, TimeUnit} +import javax.net.ssl.X509TrustManager + +import com.google.common.util.concurrent.{SettableFuture, ThreadFactoryBuilder} +import io.fabric8.kubernetes.api.model.{ContainerPort, ContainerPortBuilder, Pod, ServicePort, ServicePortBuilder} +import io.fabric8.kubernetes.client._ +import io.fabric8.kubernetes.client.Watcher.Action +import io.fabric8.kubernetes.client.internal.SSLUtils +import org.apache.commons.codec.binary.Base64 +import org.apache.commons.io.IOUtils +import scala.collection.mutable.ArrayBuffer +import scala.collection.JavaConverters._ +import scala.concurrent.{ExecutionContext} +import scala.concurrent.duration.DurationInt +import scala.util.Success + +import org.apache.spark.deploy.kubernetes.driverlauncher.{KubernetesDriverLauncherService, KubernetesSparkDriverConfiguration} +import org.apache.spark.deploy.kubernetes.httpclients.HttpClientUtil +import org.apache.spark.internal.Logging +import org.apache.spark.util.Utils + +private[spark] class Client(private val clientArgs: ClientArguments) + extends Logging { + import Client._ + private val namespace = clientArgs.kubernetesAppNamespace + private val launchTime = System.currentTimeMillis + private val kubernetesAppId = clientArgs.kubernetesAppName.getOrElse(s"spark-$launchTime") + private val secretName = s"driver-launcher-service-secret-$kubernetesAppId" + private val driverLauncherSelectorValue = s"driver-launcher-$launchTime" + + private val secretBytes = new Array[Byte](128) + SECURE_RANDOM.nextBytes(secretBytes) + private val secretBase64String = Base64.encodeBase64String(secretBytes) + private implicit val retryableExecutionContext = ExecutionContext + .fromExecutorService( + Executors.newSingleThreadExecutor(new ThreadFactoryBuilder() + .setNameFormat("kubernetes-client-retryable-futures-%d") + .setDaemon(true) + .build())) + + def run(): Unit = { + var k8ConfBuilder = new ConfigBuilder() + .withApiVersion("v1") + .withMasterUrl(clientArgs.kubernetesMaster) + .withNamespace(namespace) + clientArgs.kubernetesCaCertFile.foreach { f => k8ConfBuilder = k8ConfBuilder.withCaCertFile(f)} + clientArgs.kubernetesClientKeyFile.foreach(file => + k8ConfBuilder = k8ConfBuilder.withClientKeyFile(file)) + clientArgs.kubernetesClientCertFile.foreach(file => + k8ConfBuilder = k8ConfBuilder.withClientCertFile(file)) + + val k8ClientConfig = k8ConfBuilder.build + Utils.tryWithResource(new DefaultKubernetesClient(k8ClientConfig))(kubernetesClient => { + val secret = kubernetesClient.secrets().createNew() + .withNewMetadata() + .withName(secretName) + .endMetadata() + .withData(Map((DRIVER_LAUNCHER_SECRET_NAME, secretBase64String)).asJava) + .withType("Opaque") + .done() + try { + val driverConfiguration = buildSparkDriverConfiguration() + val selectors = Map(DRIVER_LAUNCHER_SELECTOR_LABEL -> driverLauncherSelectorValue).asJava + val uiPort = driverConfiguration + .sparkConf + .get("spark.ui.port") + .map(_.toInt) + .getOrElse(DEFAULT_UI_PORT) + val (servicePorts, containerPorts) = configurePorts(uiPort) + + val service = kubernetesClient.services().createNew() + .withNewMetadata() + .withName(kubernetesAppId) + .endMetadata() + .withNewSpec() + .withSelector(selectors) + .withPorts(servicePorts.asJava) + .endSpec() + .done() + + val submitCompletedFuture = SettableFuture.create[Boolean] + + val podWatcher = new Watcher[Pod] { + override def eventReceived(action: Action, t: Pod): Unit = { + if ((action == Action.ADDED || action == Action.MODIFIED) + && t.getStatus.getPhase == "Running" + && !submitCompletedFuture.isDone) { + t.getStatus + .getContainerStatuses + .asScala + .find(status => + status.getName == DRIVER_LAUNCHER_CONTAINER_NAME && status.getReady) match { + case Some(status) => + try { + val driverLauncher = getDriverLauncherService(k8ClientConfig) + val ping = Retry.retry(5, 5.seconds) { + driverLauncher.ping() + } + ping onFailure { + case t: Throwable => + if (!submitCompletedFuture.isDone) { + submitCompletedFuture.setException(t) + } + } + val submitComplete = ping andThen { + case Success(_) => + driverLauncher.submitApplication(secretBase64String, driverConfiguration) + submitCompletedFuture.set(true) + } + submitComplete onFailure { + case t: Throwable => + if (!submitCompletedFuture.isDone) { + submitCompletedFuture.setException(t) + } + } + } catch { + case e: Throwable => + if (!submitCompletedFuture.isDone) { + submitCompletedFuture.setException(e) + throw e + } + } + case None => + } + } + } + + override def onClose(e: KubernetesClientException): Unit = { + if (!submitCompletedFuture.isDone) { + submitCompletedFuture.setException(e) + } + } + } + + Utils.tryWithResource(kubernetesClient + .pods() + .withLabels(selectors) + .watch(podWatcher)) { _ => + kubernetesClient.pods().createNew() + .withNewMetadata() + .withName(kubernetesAppId) + .withLabels(selectors) + .endMetadata() + .withNewSpec() + .addNewVolume() + .withName(s"spark-app-secret-volume") + .withNewSecret() + .withSecretName(secret.getMetadata.getName) + .endSecret() + .endVolume + .addNewContainer() + .withName(DRIVER_LAUNCHER_CONTAINER_NAME) + .withImage(clientArgs.driverDockerImage) + .withImagePullPolicy("Never") + .addNewVolumeMount() + .withName("spark-app-secret-volume") + .withReadOnly(true) + .withMountPath("/opt/spark/spark-app-secret") + .endVolumeMount() + .addNewEnv() + .withName("SPARK_DRIVER_LAUNCHER_SERVICE_NAME") + .withValue(kubernetesAppId) + .endEnv() + .addNewEnv() + .withName("SPARK_APP_SECRET_LOCATION") + .withValue(s"/opt/spark/spark-app-secret/$DRIVER_LAUNCHER_SECRET_NAME") + .endEnv() + .addNewEnv() + .withName("SPARK_DRIVER_LAUNCHER_SERVER_PORT") + .withValue(DRIVER_LAUNCHER_SERVICE_INTERNAL_PORT.toString) + .endEnv() + .withPorts(containerPorts.asJava) + .endContainer() + .withRestartPolicy("OnFailure") + .endSpec() + .done() + submitCompletedFuture.get(30, TimeUnit.SECONDS) + } + } finally { + kubernetesClient.secrets().delete(secret) + } + }) + } + + private def getDriverLauncherService(k8ClientConfig: Config): KubernetesDriverLauncherService = { + val url = s"${ + Array[String]( + clientArgs.kubernetesMaster, + "api", "v1", "proxy", + "namespaces", namespace, + "services", kubernetesAppId).mkString("/") + }" + + s":$DRIVER_LAUNCHER_SERVICE_PORT_NAME/api" + + val sslContext = SSLUtils.sslContext(k8ClientConfig) + val trustManager = SSLUtils.trustManagers( + k8ClientConfig)(0).asInstanceOf[X509TrustManager] + val driverLauncher = HttpClientUtil.createClient[KubernetesDriverLauncherService]( + url, sslContext.getSocketFactory, trustManager) + driverLauncher + } + + private def buildSparkDriverConfiguration(): KubernetesSparkDriverConfiguration = { + // Set up Spark application configurations, classpath, and resource uploads + val driverClassPathUris = clientArgs + .driverExtraClassPath + .map(uri => Utils.resolveURI(uri)) + val kubernetesSparkProperties = Map[String, String]( + "spark.master" -> "kubernetes", + "spark.kubernetes.master" -> clientArgs.kubernetesMaster, + "spark.kubernetes.executor.docker.image" -> clientArgs.executorDockerImage, + "spark.kubernetes.namespace" -> clientArgs.kubernetesAppNamespace, + "spark.kubernetes.app.id" -> kubernetesAppId, + "spark.kubernetes.driver.service.name" -> kubernetesAppId, + "spark.blockManager.port" -> BLOCKMANAGER_PORT.toString, + "spark.driver.port" -> DRIVER_PORT.toString) + val combinedSparkConf = clientArgs.sparkConf ++ kubernetesSparkProperties + val jarUris = clientArgs.jars.map(uri => Utils.resolveURI(uri)) + + val localJarsToUpload = jarUris.union(driverClassPathUris) + .distinct + .filter(uri => uri.getScheme == "file" || uri.getScheme == "local") + .map(uri => { + val jarRawContents = Files.readAllBytes(Paths.get(uri.getPath)) + val jarBase64Contents = Base64.encodeBase64String(jarRawContents) + uri.toString -> jarBase64Contents + }).toMap + + val customExecutorSpecBase64 = clientArgs.customExecutorSpecFile.map(filePath => { + val file = new File(filePath) + if (!file.isFile) { + throw new IllegalArgumentException("Custom executor spec file was provided as " + + s"$filePath, but no file is available at that path.") + } + Utils.tryWithResource(new FileInputStream(file)) { is => + Base64.encodeBase64String(IOUtils.toByteArray(is)) + } + }) + KubernetesSparkDriverConfiguration( + localJarsToUpload, + driverClassPathUris.map(_.toString).toSeq, + jarUris.map(_.toString).toSeq, + combinedSparkConf, + clientArgs.userArgs.toSeq, + customExecutorSpecBase64, + clientArgs.customExecutorSpecContainerName, + clientArgs.userMainClass) + } + + private def configurePorts(uiPort: Int): (Seq[ServicePort], Seq[ContainerPort]) = { + val servicePorts = new ArrayBuffer[ServicePort] + val containerPorts = new ArrayBuffer[ContainerPort] + + def addPortToServiceAndContainer(portName: String, portValue: Int): Unit = { + servicePorts += new ServicePortBuilder() + .withName(portName) + .withPort(portValue) + .withNewTargetPort(portValue) + .build() + containerPorts += new ContainerPortBuilder() + .withContainerPort(portValue) + .build() + } + + addPortToServiceAndContainer( + DRIVER_LAUNCHER_SERVICE_PORT_NAME, + DRIVER_LAUNCHER_SERVICE_INTERNAL_PORT) + addPortToServiceAndContainer( + DRIVER_PORT_NAME, + DRIVER_PORT) + addPortToServiceAndContainer( + BLOCKMANAGER_PORT_NAME, + BLOCKMANAGER_PORT) + + addPortToServiceAndContainer(UI_PORT_NAME, uiPort) + clientArgs + .additionalDriverPorts + .foreach(port => addPortToServiceAndContainer(port._1, port._2)) + (servicePorts.toSeq, containerPorts.toSeq) + } +} + +private object Client { + + private val DRIVER_LAUNCHER_SECRET_NAME = "driver-launcher-secret" + private val DRIVER_LAUNCHER_SELECTOR_LABEL = "driver-launcher-selector" + private val APPLICATION_SECRET_ENV = "DRIVER_LAUNCHER_APPLICATION_SECRET" + private val DRIVER_LAUNCHER_SERVICE_INTERNAL_PORT = 7077 + private val DRIVER_PORT = 7078 + private val BLOCKMANAGER_PORT = 7079 + private val DEFAULT_UI_PORT = 4040 + private val UI_PORT_NAME = "spark-ui-port" + private val DRIVER_LAUNCHER_SERVICE_PORT_NAME = "driver-launcher-port" + private val DRIVER_PORT_NAME = "driver-port" + private val BLOCKMANAGER_PORT_NAME = "block-manager-port" + private val DRIVER_LAUNCHER_CONTAINER_NAME = "spark-kubernetes-driver-launcher" + + private val SECURE_RANDOM = new SecureRandom() + + def main(argStrings: Array[String]): Unit = { + val args = ClientArguments.builder().fromArgsArray(argStrings).build() + new Client(args).run() + } +} diff --git a/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/ClientArguments.scala b/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/ClientArguments.scala new file mode 100644 index 0000000000000..1b6c682bdae7a --- /dev/null +++ b/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/ClientArguments.scala @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes + +import org.apache.commons.lang3.Validate +import scala.collection.mutable + +import org.apache.spark.launcher.SparkSubmitArgumentsParser +import org.apache.spark.SparkConf + +private[spark] case class ClientArguments( + kubernetesAppName: Option[String], + userMainClass: String, + userArgs: Array[String], + jars: Array[String], + additionalDriverPorts: Map[String, Int], + sparkConf: Map[String, String], + driverExtraClassPath: Array[String], + driverDockerImage: String, + executorDockerImage: String, + customExecutorSpecFile: Option[String], + customExecutorSpecContainerName: Option[String], + kubernetesMaster: String, + kubernetesAppNamespace: String, + kubernetesClientCertFile: Option[String] = Option.empty, + kubernetesClientKeyFile: Option[String] = Option.empty, + kubernetesCaCertFile: Option[String] = Option.empty) + +private[spark] object ClientArguments { + def builder(): ClientArgumentsBuilder = new ClientArgumentsBuilder +} + +private[spark] case class ClientArgumentsBuilder( + kubernetesAppName: Option[String] = None, + userMainClass: Option[String] = Option.empty[String], + userArgs: Seq[String] = Seq.empty[String], + jars: Seq[String] = Seq.empty[String], + driverExtraClassPath: Seq[String] = Seq.empty[String], + additionalDriverPorts: Map[String, Int] = Map[String, Int](), + sparkConf: Map[String, String] = Map[String, String](), + driverDockerImage: Option[String] = None, // TODO use the precise version + executorDockerImage: Option[String] = None, + customExecutorSpecFile: Option[String] = None, + customExecutorSpecContainerName: Option[String] = None, + kubernetesMaster: Option[String] = None, + kubernetesAppNamespace: Option[String] = None, + kubernetesClientCertFile: Option[String] = None, + kubernetesClientKeyFile: Option[String] = None, + kubernetesCaCertFile: Option[String] = None) extends SparkSubmitArgumentsParser { + + def kubernetesAppName(kubernetesAppName: String): ClientArgumentsBuilder = { + this.copy(kubernetesAppName = Some(Validate.notBlank(kubernetesAppName))) + } + + def userMainClass(userMainClass: String): ClientArgumentsBuilder = { + this.copy(userMainClass = Some(Validate.notBlank(userMainClass))) + } + + def addUserArg(userArg: String): ClientArgumentsBuilder = { + this.copy(userArgs = this.userArgs :+ Validate.notBlank(userArg)) + } + + def addJar(jar: String): ClientArgumentsBuilder = { + this.copy(jars = this.jars :+ Validate.notBlank(jar)) + } + + def addDriverExtraClassPath(driverExtraClassPathEntry: String): ClientArgumentsBuilder = { + this.copy(driverExtraClassPath = this.driverExtraClassPath + :+ Validate.notBlank(driverExtraClassPathEntry)) + } + + def addSparkConf(conf: String, value: String): ClientArgumentsBuilder = { + this.copy(sparkConf = this.sparkConf + (Validate.notBlank(conf) -> Validate.notBlank(value))) + } + + def addDriverPort(portName: String, value: Int): ClientArgumentsBuilder = { + Validate.isTrue(value > 0, s"Port with name $portName must be positive, got: $value") + this.copy(additionalDriverPorts = this.additionalDriverPorts + + (Validate.notBlank(portName) -> value)) + } + + def driverDockerImage(driverDockerImage: String): ClientArgumentsBuilder = { + this.copy(driverDockerImage = Some(Validate.notBlank(driverDockerImage))) + } + + def executorDockerImage(executorDockerImage: String): ClientArgumentsBuilder = { + this.copy(executorDockerImage = Some(Validate.notBlank(executorDockerImage))) + } + + def customExecutorSpecFile(customExecutorSpecFile: String): ClientArgumentsBuilder = { + this.copy(customExecutorSpecFile = Some(Validate.notBlank(customExecutorSpecFile))) + } + + def customExecutorSpecContainerName(customExecutorSpecContainerName: String) + : ClientArgumentsBuilder = { + this.copy(customExecutorSpecContainerName = + Some(Validate.notBlank(customExecutorSpecContainerName))) + } + + def kubernetesMaster(kubernetesMaster: String): ClientArgumentsBuilder = { + this.copy(kubernetesMaster = Some(Validate.notBlank(kubernetesMaster))) + } + + def kubernetesAppNamespace(kubernetesAppNamespace: String): ClientArgumentsBuilder = { + this.copy(kubernetesAppNamespace = Some(Validate.notBlank(kubernetesAppNamespace))) + } + + def kubernetesClientCertFile(kubernetesClientCertFile: String): ClientArgumentsBuilder = { + this.copy(kubernetesClientCertFile = Some(Validate.notBlank(kubernetesClientCertFile))) + } + + def kubernetesClientKeyFile(kubernetesClientKeyFile: String): ClientArgumentsBuilder = { + this.copy(kubernetesClientKeyFile = Some(Validate.notBlank(kubernetesClientKeyFile))) + } + + def kubernetesCaCertFile(kubernetesCaCertFile: String): ClientArgumentsBuilder = { + this.copy(kubernetesCaCertFile = Some(Validate.notBlank(kubernetesCaCertFile))) + } + + def fromArgsArray(inputArgs: Array[String]): ClientArgumentsBuilder = { + var args = inputArgs.toList + var currentBuilder = this + while (args.nonEmpty) { + currentBuilder = args match { + case (KUBERNETES_APP_NAME) :: value :: tail => + args = tail + currentBuilder.kubernetesAppName(value) + + case (CLASS) :: value :: tail => + args = tail + currentBuilder.userMainClass(value) + + case (CONF) :: value :: tail => + args = tail + currentBuilder.addSparkConf(value.split("=")(0), value.split("=")(1)) + + case (JARS) :: value :: tail => + args = tail + for (jar <- value.split(",")) { + currentBuilder = currentBuilder.addJar(jar) + } + currentBuilder + + case "--arg" :: value :: tail => + args = tail + currentBuilder.addUserArg(value) + + case (KUBERNETES_EXPOSE_DRIVER_PORT) :: value :: tail => + args = tail + currentBuilder.addDriverPort( + value.split("=")(0), value.split("=")(1).toInt) + + case (DRIVER_CLASS_PATH) :: value :: tail => + args = tail + for (entry <- value.split(",")) { + currentBuilder = currentBuilder.addDriverExtraClassPath(entry) + } + currentBuilder + + case (KUBERNETES_DRIVER_DOCKER_IMAGE) :: value :: tail => + args = tail + currentBuilder.driverDockerImage(value) + + case (KUBERNETES_EXECUTOR_DOCKER_IMAGE) :: value :: tail => + args = tail + currentBuilder.executorDockerImage(value) + + case (KUBERNETES_CUSTOM_EXECUTOR_SPEC_FILE) :: value :: tail => + args = tail + currentBuilder.customExecutorSpecFile(value) + + case (KUBERNETES_CUSTOM_EXECUTOR_SPEC_CONTAINER_NAME) :: value :: tail => + args = tail + currentBuilder.customExecutorSpecContainerName(value) + + case (KUBERNETES_MASTER) :: value :: tail => + args = tail + currentBuilder.kubernetesMaster(value) + + case (KUBERNETES_APP_NAMESPACE) :: value :: tail => + args = tail + currentBuilder.kubernetesAppNamespace(value) + + case (KUBERNETES_CA_CERT_FILE) :: value :: tail => + args = tail + currentBuilder.kubernetesCaCertFile(value) + + case (KUBERNETES_CLIENT_CERT_FILE) :: value :: tail => + args = tail + currentBuilder.kubernetesClientCertFile(value) + + case (KUBERNETES_CLIENT_KEY_FILE) :: value :: tail => + args = tail + currentBuilder.kubernetesClientKeyFile(value) + + case Nil => currentBuilder + + case _ => + throw new IllegalArgumentException(getUsageMessage(args)) + } + } + currentBuilder + } + + def build(): ClientArguments = { + val withSystemProperties = withLoadedFromSparkConf() + validateOptions( + (withSystemProperties.userMainClass, "Must specify a main class to run"), + (withSystemProperties.kubernetesMaster, "Must specify a Kubernetes master.")) + ClientArguments( + kubernetesAppName = withSystemProperties.kubernetesAppName, + userMainClass = withSystemProperties.userMainClass.get, + userArgs = withSystemProperties.userArgs.toArray, + jars = withSystemProperties.jars.toArray, + additionalDriverPorts = withSystemProperties.additionalDriverPorts, + sparkConf = withSystemProperties.sparkConf, + driverExtraClassPath = withSystemProperties.driverExtraClassPath.toArray, + // TODO use specific versions + driverDockerImage = withSystemProperties.driverDockerImage.getOrElse("spark-driver:latest"), + executorDockerImage = withSystemProperties + .executorDockerImage + .getOrElse("spark-executor:latest"), + customExecutorSpecFile = withSystemProperties.customExecutorSpecFile, + customExecutorSpecContainerName = withSystemProperties.customExecutorSpecContainerName, + kubernetesMaster = withSystemProperties.kubernetesMaster.get, + kubernetesAppNamespace = withSystemProperties.kubernetesAppNamespace.getOrElse("default"), + kubernetesClientCertFile = withSystemProperties.kubernetesClientCertFile, + kubernetesClientKeyFile = withSystemProperties.kubernetesClientKeyFile, + kubernetesCaCertFile = withSystemProperties.kubernetesCaCertFile) + } + + private def withLoadedFromSparkConf(): ClientArgumentsBuilder = { + val sysPropsSparkConf = new SparkConf(true) + val sparkConfKubernetesAppName = sysPropsSparkConf.getOption("spark.kubernetes.app.name") + val sparkConfKubernetesMaster = sysPropsSparkConf.getOption("spark.kubernetes.master") + val sparkConfKubernetesNamespace = sysPropsSparkConf.getOption("spark.kubernetes.namespace") + val sparkConfKubernetesClientCertFile = sysPropsSparkConf + .getOption("spark.kubernetes.client.cert.file") + val sparkConfKubernetesClientKeyFile = sysPropsSparkConf + .getOption("spark.kubernetes.client.key.file") + val sparkConfKubernetesCaCertFile = sysPropsSparkConf.getOption("spark.kubernetes.ca.cert.file") + val sparkConfExecutorCustomSpecFile = sysPropsSparkConf + .getOption("spark.kubernetes.executor.custom.spec.file") + val sparkConfExecutorCustomSpecContainerName = sysPropsSparkConf + .getOption("spark.kubernetes.executor.custom.spec.container.name") + val sparkConfExecutorDockerImage = sysPropsSparkConf + .getOption("spark.kubernetes.executor.docker.image") + val resolvedSparkConf = sysPropsSparkConf.getAll.toMap ++ sparkConf + val resolvedJars = if (jars.isEmpty) { + sysPropsSparkConf + .getOption("spark.jars") + .map(_.split(",").toSeq) + .getOrElse(Seq.empty[String]) + } else { + jars + } + val resolvedDriverExtraClassPath = if (driverExtraClassPath.isEmpty) { + sysPropsSparkConf + .getOption("spark.driver.extraClassPath") + .map(_.split(",").toSeq) + .getOrElse(Seq.empty[String]) + } else { + driverExtraClassPath + } + copy(kubernetesAppName = kubernetesAppName.orElse(sparkConfKubernetesAppName), + jars = resolvedJars, + driverExtraClassPath = resolvedDriverExtraClassPath, + executorDockerImage = executorDockerImage.orElse(sparkConfExecutorDockerImage), + kubernetesMaster = kubernetesMaster.orElse(sparkConfKubernetesMaster), + kubernetesAppNamespace = kubernetesAppNamespace.orElse(sparkConfKubernetesNamespace), + sparkConf = resolvedSparkConf.toMap, + customExecutorSpecFile = customExecutorSpecFile.orElse(sparkConfExecutorCustomSpecFile), + customExecutorSpecContainerName = customExecutorSpecContainerName + .orElse(sparkConfExecutorCustomSpecContainerName), + kubernetesClientCertFile = kubernetesClientCertFile.orElse(sparkConfKubernetesClientCertFile), + kubernetesClientKeyFile = kubernetesClientKeyFile.orElse(sparkConfKubernetesClientKeyFile), + kubernetesCaCertFile = kubernetesClientCertFile.orElse(sparkConfKubernetesCaCertFile)) + } + + private def validateOptions(opts: (Option[String], String)*): Unit = { + opts.foreach(opt => assert(opt._1.isDefined, opt._2)) + } + + private def getUsageMessage(unknownParam: List[String] = null): String = { + if (unknownParam != null) s"Unknown/unsupported param $unknownParam\n" else "" + } +} diff --git a/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesClientBuilder.scala b/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesClientBuilder.scala new file mode 100644 index 0000000000000..e4e4054cac941 --- /dev/null +++ b/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesClientBuilder.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes + +import java.io.File + +import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient} +import scala.io.Source + +private[spark] object KubernetesClientBuilder { + private val API_SERVER_TOKEN = new File("/var/run/secrets/kubernetes.io/serviceaccount/token") + private val CA_CERT_FILE = new File("/var/run/secrets/kubernetes.io/serviceaccount/ca.crt") + + /** + * Creates a {@link KubernetesClient}, expecting to be from + * within the context of a pod. When doing so, credentials files + * are picked up from canonical locations, as they are injected + * into the pod's disk space. + */ + def buildFromWithinPod( + kubernetesMaster: String, + kubernetesNamespace: String): DefaultKubernetesClient = { + var clientConfigBuilder = new ConfigBuilder() + .withApiVersion("v1") + .withMasterUrl(kubernetesMaster) + .withNamespace(kubernetesNamespace) + + if (CA_CERT_FILE.isFile) { + clientConfigBuilder = clientConfigBuilder.withCaCertFile(CA_CERT_FILE.getAbsolutePath) + } + + if (API_SERVER_TOKEN.isFile) { + clientConfigBuilder = clientConfigBuilder.withOauthToken( + Source.fromFile(API_SERVER_TOKEN).mkString) + } + new DefaultKubernetesClient(clientConfigBuilder.build) + + } +} diff --git a/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Retry.scala b/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Retry.scala new file mode 100644 index 0000000000000..e5ce0bcd606b2 --- /dev/null +++ b/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Retry.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes + +import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.duration.Duration + +private[spark] object Retry { + + private def retryableFuture[T] + (times: Int, interval: Duration) + (f: => Future[T]) + (implicit executionContext: ExecutionContext): Future[T] = { + f recoverWith { + case _ if times > 0 => { + Thread.sleep(interval.toMillis) + retryableFuture(times - 1, interval)(f) + } + } + } + + def retry[T] + (times: Int, interval: Duration) + (f: => T) + (implicit executionContext: ExecutionContext): Future[T] = { + retryableFuture(times, interval)(Future[T] { f }) + } +} diff --git a/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/driverlauncher/KubernetesDriverLauncherServer.scala b/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/driverlauncher/KubernetesDriverLauncherServer.scala new file mode 100644 index 0000000000000..512e46d8640b4 --- /dev/null +++ b/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/driverlauncher/KubernetesDriverLauncherServer.scala @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes.driverlauncher + +import java.io.File + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider +import com.fasterxml.jackson.module.scala.DefaultScalaModule +import com.google.common.io.Files +import org.apache.commons.codec.binary.Base64 +import org.eclipse.jetty.server.{Server, ServerConnector} +import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder} +import org.eclipse.jetty.util.thread.QueuedThreadPool +import org.glassfish.jersey.server.ResourceConfig +import org.glassfish.jersey.servlet.ServletContainer + +/** + * REST server responsible for helping a Kubernetes-backed Spark application + * bootstrap its dependencies. This is expected to be run in a Kubernetes pod + * and have environment variables bootstrapped to it. +*/ +private[spark] class KubernetesDriverLauncherServer { + + private val port = System.getenv("SPARK_DRIVER_LAUNCHER_SERVER_PORT") + assert(port != null, "SPARK_DRIVER_LAUNCHER_SERVER_PORT is not set.") + private val secretFile = System.getenv("SPARK_APP_SECRET_LOCATION") + assert(secretFile != null, "SPARK_APP_SECRET_LOCATION is not set.") + private var jettyServer: Server = null + + // TODO(mcheah) use SSL + private def startServer(): Server = { + val threadPool = new QueuedThreadPool + threadPool.setDaemon(true) + jettyServer = new Server(threadPool) + val connector = new ServerConnector(jettyServer) + connector.setPort(port.toInt) + jettyServer.addConnector(connector) + val mainHandler = new ServletContextHandler + val resourceConfig = new ResourceConfig() + val applicationSecret = Base64.encodeBase64String(Files.toByteArray(new File(secretFile))) + resourceConfig.registerInstances( + new KubernetesDriverLauncherServiceImpl(applicationSecret, this) + .asInstanceOf[KubernetesDriverLauncherService], + new JacksonJsonProvider(new ObjectMapper().registerModule(new DefaultScalaModule))) + mainHandler.setContextPath("/") + val servletHolder = new ServletHolder("main", new ServletContainer(resourceConfig)) + mainHandler.addServlet(servletHolder, "/api/*") + jettyServer.setHandler(mainHandler) + jettyServer.start() + jettyServer + } + + def stop(): Unit = { + jettyServer.stop() + } +} + +private[spark] object KubernetesDriverLauncherServer { + + def main(argStrings: Array[String]): Unit = { + val server = new KubernetesDriverLauncherServer().startServer() + server.join() + } + +} diff --git a/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/driverlauncher/KubernetesDriverLauncherService.scala b/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/driverlauncher/KubernetesDriverLauncherService.scala new file mode 100644 index 0000000000000..5a35880cb6413 --- /dev/null +++ b/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/driverlauncher/KubernetesDriverLauncherService.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes.driverlauncher + +import javax.ws.rs.{Consumes, GET, HeaderParam, Path, POST, Produces} +import javax.ws.rs.core.{HttpHeaders, MediaType} + +@Path("/kubernetes-driver-launcher-service") +private[spark] trait KubernetesDriverLauncherService { + + @POST + @Consumes(Array(MediaType.APPLICATION_JSON)) + @Produces(Array(MediaType.APPLICATION_JSON)) + @Path("/submitApplication") + def submitApplication( + @HeaderParam("X-" + HttpHeaders.AUTHORIZATION) applicationSecret: String, + applicationConfiguration: KubernetesSparkDriverConfiguration): Unit + + @GET + @Produces(Array(MediaType.TEXT_PLAIN)) + @Consumes(Array(MediaType.APPLICATION_JSON)) + @Path("/ping") + def ping(): String +} diff --git a/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/driverlauncher/KubernetesDriverLauncherServiceImpl.scala b/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/driverlauncher/KubernetesDriverLauncherServiceImpl.scala new file mode 100644 index 0000000000000..de0d8298ca358 --- /dev/null +++ b/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/driverlauncher/KubernetesDriverLauncherServiceImpl.scala @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes.driverlauncher + +import java.io.File +import java.net.URI +import java.nio.file.Paths +import java.util.concurrent.Executors +import java.util.concurrent.atomic.AtomicBoolean +import javax.ws.rs.NotAuthorizedException + +import com.google.common.io.Files +import com.google.common.util.concurrent.ThreadFactoryBuilder +import org.apache.commons.codec.binary.Base64 +import org.apache.commons.io.FileUtils +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.util.{ShutdownHookManager, Utils} + +import scala.concurrent.{Future, ExecutionContext} + +private[spark] class KubernetesDriverLauncherServiceImpl( + private val expectedApplicationSecret: String, + private val server: KubernetesDriverLauncherServer +) extends KubernetesDriverLauncherService with Logging { + + private val uploadedJarsFolder = Files.createTempDir() + private val javaExecutable = s"${System.getenv("JAVA_HOME")}/bin/java" + private val sparkHome = System.getenv("SPARK_HOME") + private implicit val waitForApplicationExecutor = ExecutionContext.fromExecutorService( + Executors.newSingleThreadExecutor(new ThreadFactoryBuilder() + .setNameFormat("wait-for-application-%d") + .setDaemon(true) + .build())) + private var applicationCompleteFuture: Future[_] = null + private var submitComplete = false + + override def submitApplication( + applicationSecret: String, + applicationConfiguration: KubernetesSparkDriverConfiguration): Unit = synchronized { + if (!expectedApplicationSecret.equals(applicationSecret)) { + throw new NotAuthorizedException("Unauthorized to run application.") + } + if (submitComplete) { + throw new IllegalStateException("Application is already started, cannot submit again.") + } + + val usedFileNames = new mutable.HashSet[String] + val localJarPaths = new mutable.HashMap[String, String] + + for (jar <- applicationConfiguration.localJarsContents) { + assert(jar._1.startsWith("file:"), "Uploaded jars should only contain local files") + var fileName = new File(URI.create(jar._1).getPath).getName + var fileNameSuffix = 1 + while (usedFileNames.contains(fileName)) { + fileName = s"${Files.getNameWithoutExtension(fileName)}" + + s"-$fileNameSuffix${Files.getFileExtension(fileName)}" + fileNameSuffix += 1 + } + usedFileNames.add(fileName) + + val writtenJarFile = new File(uploadedJarsFolder, fileName) + val decodedJarBytes = Base64.decodeBase64(jar._2) + Files.write(decodedJarBytes, writtenJarFile) + localJarPaths.put(jar._1, writtenJarFile.getAbsolutePath) + } + + applicationConfiguration + .driverClassPath + .map(Utils.resolveURI) + .foreach(uri => assert(uri.getScheme == "local" || uri.getScheme == "file")) + + val driverClassPath = getDriverClassPath(applicationConfiguration, localJarPaths) + + val command = new ArrayBuffer[String] + command += javaExecutable + + if (applicationConfiguration.jars.nonEmpty) { + val sparkJars = applicationConfiguration.jars.map(localJarPaths(_)).mkString(",") + command += s"-Dspark.jars=$sparkJars" + } + + applicationConfiguration.customExecutorSpecBase64.foreach(customExecutorSpecBase64 => { + val customExecutorSpecBytes = Base64.decodeBase64(customExecutorSpecBase64) + val replicationControllerSpecFile = new File( + Files.createTempDir(), "executor-replication-controller") + FileUtils.writeByteArrayToFile(replicationControllerSpecFile, customExecutorSpecBytes) + command += "-Dspark.kubernetes.executor.custom.spec.file=" + + s"${replicationControllerSpecFile.getAbsolutePath}" + }) + applicationConfiguration.customExecutorSpecContainerName.foreach(name => { + command += s"-Dspark.kubernetes.executor.custom.spec.container.name=$name" + }) + command += "-cp" + command += driverClassPath.mkString(":") + for (prop <- applicationConfiguration.sparkConf) { + // Have to ignore spark.jars and driver extra class path since they were loaded + // differently above + if (prop._1 != "spark.jars" && prop._1 != "spark.driver.extraClassPath") { + command += s"-D${prop._1}=${prop._2}" + } + } + + if (applicationConfiguration.sparkConf.getOrElse("spark.authenticate", "false").toBoolean) { + applicationConfiguration.sparkConf.get("spark.authenticate.secret") match { + case None => + command += s"-Dspark.authenticate.secret=$expectedApplicationSecret" + case Some(_) => logInfo("Using user-provided secret.") + } + } + + if (applicationConfiguration.sparkConf.contains("spark.driver.memory")) { + command += s"-Xmx${applicationConfiguration.sparkConf("spark.driver.memory")}" + } + + command += applicationConfiguration.driverMainClass + command ++= applicationConfiguration.driverArgs + + val pb = new ProcessBuilder(command: _*) + Paths.get(sparkHome, "logs").toFile.mkdirs + pb.redirectOutput(Paths.get(sparkHome, "logs", "stdout").toFile) + pb.redirectError(Paths.get(sparkHome, "logs", "stderr").toFile) + val process = pb.start() + submitComplete = true + ShutdownHookManager.addShutdownHook(() => { + logInfo("Received stop command, shutting down the running Spark application...") + process.destroy() + }) + applicationCompleteFuture = Future[Unit] { + try { + process.waitFor() + } finally { + server.stop() + } + } + } + + override def ping(): String = "pong" + + private def getDriverClassPath( + applicationConfiguration: KubernetesSparkDriverConfiguration, + localJarPaths: mutable.HashMap[String, String]): mutable.Buffer[String] = { + val driverClassPath = new ArrayBuffer[String] + val jarsDir = new File(sparkHome, "jars") + jarsDir.listFiles().foreach { + driverClassPath += _.getAbsolutePath + } + applicationConfiguration.driverClassPath.foreach { + driverClassPath += localJarPaths(_) + } + applicationConfiguration.jars.foreach { + driverClassPath += localJarPaths(_) + } + driverClassPath + } +} diff --git a/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/driverlauncher/KubernetesSparkDriverConfiguration.scala b/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/driverlauncher/KubernetesSparkDriverConfiguration.scala new file mode 100644 index 0000000000000..bc64cded92baa --- /dev/null +++ b/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/driverlauncher/KubernetesSparkDriverConfiguration.scala @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes.driverlauncher + +private[spark] case class KubernetesSparkDriverConfiguration( + localJarsContents: Map[String, String], + driverClassPath: Seq[String], + jars: Seq[String], + sparkConf: Map[String, String], + driverArgs: Seq[String], + customExecutorSpecBase64: Option[String], + customExecutorSpecContainerName: Option[String], + driverMainClass: String) diff --git a/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/httpclients/HttpClientUtil.scala b/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/httpclients/HttpClientUtil.scala new file mode 100644 index 0000000000000..ecd685553a5dc --- /dev/null +++ b/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/httpclients/HttpClientUtil.scala @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes.httpclients + +import javax.net.ssl.{SSLContext, SSLSocketFactory, X509TrustManager} +import javax.ws.rs.core.MediaType + +import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} +import com.fasterxml.jackson.module.scala.DefaultScalaModule +import com.google.common.net.HttpHeaders +import feign.Feign +import feign.codec.{Decoder, StringDecoder} +import feign.jackson.{JacksonDecoder, JacksonEncoder} +import feign.jaxrs.JAXRSContract +import feign.Request.Options +import okhttp3.OkHttpClient +import scala.collection.JavaConverters._ +import scala.reflect.ClassTag + +import org.apache.spark.internal.Logging +import org.apache.spark.status.api.v1.JacksonMessageWriter + +private[spark] object HttpClientUtil extends Logging { + private val STRING_DECODER = new StringDecoder + + def createClient[T: ClassTag]( + uri: String, + sslSocketFactory: SSLSocketFactory = SSLContext.getDefault.getSocketFactory, + trustContext: X509TrustManager = null, + readTimeoutMillis: Int = 20000, + connectTimeoutMillis: Int = 20000): T = { + var httpClientBuilder = new OkHttpClient.Builder() + Option.apply(trustContext).foreach(context => { + httpClientBuilder = httpClientBuilder.sslSocketFactory(sslSocketFactory, context) + }) + val objectMapper = new ObjectMapper() + .registerModule(new DefaultScalaModule) + .setDateFormat(JacksonMessageWriter.makeISODateFormat) + objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + val clazz = implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]] + Feign.builder() + .client(new feign.okhttp.OkHttpClient(httpClientBuilder.build())) + .contract(new JAXRSContract) + .encoder(new JacksonEncoder(objectMapper)) + .decoder(new TextDelegateDecoder(new JacksonDecoder(objectMapper))) + .options(new Options(connectTimeoutMillis, readTimeoutMillis)) + .target(clazz, uri) + } + + private class TextDelegateDecoder(private val delegate: Decoder) extends Decoder { + + override def decode(response: feign.Response, t: java.lang.reflect.Type) : AnyRef = { + val contentTypes: Iterable[String] = response + .headers() + .asScala + .mapValues(_.asScala) + .filterKeys(_.equalsIgnoreCase(HttpHeaders.CONTENT_TYPE)) + .values + .flatten + + // Use string decoder only if we're given exactly the text/plain content type + if (contentTypes.size == 1 && contentTypes.head.startsWith(MediaType.TEXT_PLAIN)) { + STRING_DECODER.decode(response, t) + } else { + delegate.decode(response, t) + } + } + } +} diff --git a/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/shuffle/KubernetesShuffleService.scala b/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/shuffle/KubernetesShuffleService.scala new file mode 100644 index 0000000000000..75c40749e4717 --- /dev/null +++ b/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/shuffle/KubernetesShuffleService.scala @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes.shuffle + +import java.util.concurrent.CountDownLatch + +import org.apache.commons.lang3.Validate + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.ExternalShuffleService +import org.apache.spark.internal.Logging +import org.apache.spark.launcher.SparkSubmitArgumentsParser +import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler +import org.apache.spark.network.util.TransportConf +import org.apache.spark.util.{ShutdownHookManager, Utils} + +private case class KubernetesShuffleServiceArguments( + kubernetesMaster: String, + secretsNamespace: String) + +private case class KubernetesShuffleServiceArgumentsBuilder( + kubernetesMaster: Option[String] = None, + secretsNamespace: Option[String] = None) { + def build(): KubernetesShuffleServiceArguments = { + val resolvedKubernetesMaster = kubernetesMaster.getOrElse( + throw new IllegalArgumentException("Kubernetes master must be specified.")) + val resolvedSecretsNamespace = secretsNamespace.getOrElse( + throw new IllegalArgumentException("Secrets namespace must be specified.")) + KubernetesShuffleServiceArguments(resolvedKubernetesMaster, resolvedSecretsNamespace) + } +} + +private object KubernetesShuffleServiceArgumentsBuilder extends SparkSubmitArgumentsParser { + def fromArgsArray(argStrings: Array[String]): KubernetesShuffleServiceArguments = { + var currentBuilder = new KubernetesShuffleServiceArgumentsBuilder() + var args = argStrings.toList + while (args.nonEmpty) { + currentBuilder = args match { + case KUBERNETES_MASTER :: value :: tail => + args = tail + currentBuilder.copy( + kubernetesMaster = Some(Validate.notBlank(value, + "Kubernetes master must not be empty."))) + case "--shuffle-service-secrets-namespace" :: value :: tail => + args = tail + currentBuilder.copy( + secretsNamespace = Some(Validate.notBlank(value, + "Secrets namespace must not be empty."))) + case Nil => currentBuilder + case _ => + // TODO fill in usage message + throw new IllegalArgumentException("Unsupported parameter") + } + } + currentBuilder.build() + } +} + +private[spark] class KubernetesShuffleService( + sparkConf: SparkConf, + securityManager: KubernetesShuffleServiceSecurityManager) + extends ExternalShuffleService(sparkConf, securityManager) { + + override protected def newShuffleBlockHandler( + conf: TransportConf): ExternalShuffleBlockHandler = { + new KubernetesShuffleBlockHandler(conf, null, securityManager) + } + + override def stop(): Unit = { + try { + super.stop() + } catch { + case e: Throwable => logError("Error stopping shuffle service.", e) + } + securityManager.stop() + } +} + +private[spark] object KubernetesShuffleService extends Logging { + private val barrier = new CountDownLatch(1) + + def main(args: Array[String]): Unit = { + Utils.initDaemon(log) + val sparkConf = new SparkConf + Utils.loadDefaultSparkProperties(sparkConf) + sparkConf.set("spark.shuffle.service.enabled", "true") + sparkConf.set("spark.authenticate.secret", "unused") + val parsedArgs = KubernetesShuffleServiceArgumentsBuilder.fromArgsArray(args) + val securityManager = new KubernetesShuffleServiceSecurityManager( + kubernetesMaster = parsedArgs.kubernetesMaster, + shuffleServiceSecretsNamespace = parsedArgs.secretsNamespace, + sparkConf = sparkConf) + val server = new KubernetesShuffleService(sparkConf, securityManager) + server.start() + logDebug("Adding shutdown hook") // force eager creation of logger + ShutdownHookManager.addShutdownHook { () => + logInfo("Shutting down shuffle service.") + server.stop() + barrier.countDown() + } + // keep running until the process is terminated + barrier.await() + + } +} \ No newline at end of file diff --git a/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/shuffle/KubernetesShuffleServiceSecurityManager.scala b/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/shuffle/KubernetesShuffleServiceSecurityManager.scala new file mode 100644 index 0000000000000..0d12408f31d0d --- /dev/null +++ b/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/shuffle/KubernetesShuffleServiceSecurityManager.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes.shuffle + +import com.google.common.base.Charsets +import org.apache.commons.codec.binary.Base64 +import scala.collection.JavaConverters._ + +import org.apache.spark.{SecurityManager, SparkConf} +import org.apache.spark.deploy.kubernetes.KubernetesClientBuilder + +private[spark] class KubernetesShuffleServiceSecurityManager( + private val shuffleServiceSecretsNamespace: String, + private val kubernetesMaster: String, + private val sparkConf: SparkConf) extends SecurityManager(sparkConf) { + + private val kubernetesClient = KubernetesClientBuilder.buildFromWithinPod( + kubernetesMaster, shuffleServiceSecretsNamespace) + + def stop(): Unit = { + try { + kubernetesClient.close() + } catch { + case e: Throwable => logError("Failed to shut down kubernetes client.", e) + } + } + + override def getSecretKey(appId: String): String = { + new String(Base64.decodeBase64(kubernetesClient.secrets() + .withName(s"spark-secret-$appId") + .get() + .getData + .asScala + .getOrElse("spark-shuffle-secret", throw new IllegalArgumentException( + s"Expected spark-shuffle-secret to be registered for app $appId"))), Charsets.UTF_8) + } + + def applicationComplete(appId: String): Unit = { + try { + kubernetesClient.secrets().withName(secretNameForApp(appId)).delete() + } catch { + case e: Throwable => logError(s"Failed to delete secret for app $appId; perhaps auth" + + s" was not enabled?", e) + } + } + + private def secretNameForApp(appId: String) = s"spark-secret-$appId" +} \ No newline at end of file diff --git a/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/shuffle/StartKubernetesShuffleService.scala b/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/shuffle/StartKubernetesShuffleService.scala new file mode 100644 index 0000000000000..74c45dc36be49 --- /dev/null +++ b/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/shuffle/StartKubernetesShuffleService.scala @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes.shuffle + +import java.util.UUID + +import io.fabric8.kubernetes.api.model.QuantityBuilder +import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient} +import org.apache.commons.lang3.Validate +import scala.collection.JavaConverters._ + +import org.apache.spark.launcher.SparkSubmitArgumentsParser +import org.apache.spark.SPARK_VERSION +import org.apache.spark.util.Utils + +private[spark] case class StartKubernetesShuffleServiceArguments( + kubernetesMaster: String, + shuffleServiceNamespace: String, + shuffleServiceDaemonSetName: String, + shuffleServicePort: Int, + shuffleServiceMemory: String, + shuffleServiceDockerImage: String, + shuffleHostPathDir: String, + shuffleServiceAuthEnabled: Boolean, + kubernetesClientCertFile: Option[String] = None, + kubernetesClientKeyFile: Option[String] = None, + kubernetesCaCertFile: Option[String] = None) + +private[spark] case class StartKubernetesShuffleServiceArgumentsBuilder( + kubernetesMaster: Option[String] = None, + shuffleServiceNamespace: Option[String] = None, + shuffleServiceDaemonSetName: String = "spark-shuffle-service", + shuffleServicePort: Int = 7337, + shuffleServiceMemory: String = "1g", + shuffleHostPathDir: String = "/tmp", + shuffleServiceAuthEnabled: Boolean = false, + shuffleServiceDockerImage: String = s"spark-shuffle-service:$SPARK_VERSION", + kubernetesClientCertFile: Option[String] = None, + kubernetesClientKeyFile: Option[String] = None, + kubernetesCaCertFile: Option[String] = None) { + + def build(): StartKubernetesShuffleServiceArguments = { + val resolvedMaster = kubernetesMaster.getOrElse( + throw new IllegalArgumentException("Must specify kubernetes master")) + val resolvedNamespace = shuffleServiceNamespace.getOrElse( + throw new IllegalArgumentException("Must specify namespace")) + StartKubernetesShuffleServiceArguments( + kubernetesMaster = resolvedMaster, + shuffleServiceNamespace = resolvedNamespace, + shuffleServiceDaemonSetName = shuffleServiceDaemonSetName, + shuffleServicePort = shuffleServicePort, + shuffleServiceMemory = shuffleServiceMemory, + shuffleHostPathDir = shuffleHostPathDir, + shuffleServiceDockerImage = shuffleServiceDockerImage, + shuffleServiceAuthEnabled = shuffleServiceAuthEnabled, + kubernetesClientCertFile = kubernetesClientCertFile, + kubernetesClientKeyFile = kubernetesClientKeyFile, + kubernetesCaCertFile = kubernetesCaCertFile) + } +} + +private[spark] object StartKubernetesShuffleServiceArgumentsBuilder + extends SparkSubmitArgumentsParser { + + def fromArgsArray(argStrings: Array[String]): StartKubernetesShuffleServiceArguments = { + var args = argStrings.toList + var currentBuilder = new StartKubernetesShuffleServiceArgumentsBuilder() + while (args.nonEmpty) { + currentBuilder = args match { + case KUBERNETES_MASTER :: value :: tail => + args = tail + currentBuilder.copy( + kubernetesMaster = Some(Validate.notBlank(value, + "Kubernetes master must not be empty."))) + case "--namespace" :: value :: tail => + args = tail + currentBuilder.copy( + shuffleServiceNamespace = Some(Validate.notBlank(value, + "Namespace must not be empty."))) + case "--daemon-set-name" :: value :: tail => + args = tail + currentBuilder.copy( + shuffleServiceDaemonSetName = Validate.notBlank(value, + "Daemon set name must not be empty.")) + case "--port" :: value :: tail => + args = tail + currentBuilder.copy( + shuffleServicePort = Validate.notBlank(value, "Port must not be empty.").toInt) + case "--memory" :: value :: tail => + args = tail + currentBuilder.copy( + shuffleServiceMemory = Validate.notBlank(value, "Memory must not be empty.")) + case "--shuffle-service-host-directory" :: value :: tail => + args = tail + currentBuilder.copy(shuffleHostPathDir = Validate.notBlank(value, + "Shuffle host directory must not be empty.")) + case "--enable-auth" :: value :: tail => + args = tail + currentBuilder.copy(shuffleServiceAuthEnabled = value.toBoolean) + case KUBERNETES_CLIENT_KEY_FILE :: value :: tail => + args = tail + currentBuilder.copy( + kubernetesClientKeyFile = Some(Validate.notBlank(value, + "Client key file must not be empty."))) + case KUBERNETES_CLIENT_CERT_FILE :: value :: tail => + args = tail + currentBuilder.copy( + kubernetesClientCertFile = Some(Validate.notBlank(value, + "Client cert file must not be empty."))) + case KUBERNETES_CA_CERT_FILE :: value :: tail => + args = tail + currentBuilder.copy( + kubernetesCaCertFile = Some(Validate.notBlank(value, + "Ca cert file must not be empty."))) + case Nil => currentBuilder + case _ => + // TODO fill in usage message + throw new IllegalArgumentException("Unsupported parameter") + } + } + currentBuilder.build() + } +} +private[spark] class StartKubernetesShuffleService { + import StartKubernetesShuffleService._ + + def run(parsedArguments: StartKubernetesShuffleServiceArguments): Unit = { + var clientConfigBuilder = new ConfigBuilder() + .withApiVersion("v1") + .withMasterUrl(parsedArguments.kubernetesMaster) + .withNamespace(parsedArguments.shuffleServiceNamespace) + + parsedArguments.kubernetesCaCertFile.foreach(path => { + clientConfigBuilder = clientConfigBuilder.withCaCertFile(path) + }) + parsedArguments.kubernetesClientCertFile.foreach(path => { + clientConfigBuilder = clientConfigBuilder.withClientCertFile(path) + }) + parsedArguments.kubernetesClientKeyFile.foreach(path => { + clientConfigBuilder = clientConfigBuilder.withClientKeyFile(path) + }) + Utils.tryWithResource(new DefaultKubernetesClient(clientConfigBuilder.build())) { + kubernetesClient => + val shuffleServiceLabel = UUID.randomUUID.toString.replaceAll("-", "") + val selectors = Map(SHUFFLE_SERVICE_SELECTOR -> shuffleServiceLabel).asJava + val requestedShuffleServiceMemoryBytes = Utils.byteStringAsBytes( + parsedArguments.shuffleServiceMemory).toString + val shuffleServiceMemoryQuantity = new QuantityBuilder(false) + .withAmount(requestedShuffleServiceMemoryBytes) + .build() + kubernetesClient.extensions().daemonSets().createNew() + .withNewMetadata() + .withName(parsedArguments.shuffleServiceDaemonSetName) + .withLabels(selectors) + .endMetadata() + .withNewSpec() + .withNewSelector() + .withMatchLabels(selectors) + .endSelector() + .withNewTemplate() + .withNewMetadata() + .withName(parsedArguments.shuffleServiceDaemonSetName) + .withLabels(selectors) + .endMetadata() + .withNewSpec() + .addNewVolume() + .withName("shuffles-volume") + .withNewHostPath().withPath(parsedArguments.shuffleHostPathDir).endHostPath() + .endVolume() + .addNewContainer() + .withName(s"shuffle-service-container") + .withImage(parsedArguments.shuffleServiceDockerImage) + .withImagePullPolicy("IfNotPresent") + .addNewPort().withContainerPort(parsedArguments.shuffleServicePort).endPort() + .addNewEnv() + .withName("SPARK_SHUFFLE_SERVICE_PORT") + .withValue(parsedArguments.shuffleServicePort.toString) + .endEnv() + .addNewEnv() + .withName("SPARK_SHUFFLE_SERVICE_MEMORY") + .withValue(parsedArguments.shuffleServiceMemory) + .endEnv() + .addNewEnv() + .withName("KUBERNETES_MASTER") + .withValue(parsedArguments.kubernetesMaster) + .endEnv() + .addNewEnv() + .withName("SHUFFLE_SERVICE_SECRETS_NAMESPACE") + .withValue(parsedArguments.shuffleServiceNamespace) + .endEnv() + .addNewEnv() + .withName("SPARK_AUTH_ENABLED") + .withValue(parsedArguments.shuffleServiceAuthEnabled.toString) + .endEnv() + .withNewResources() + .addToRequests("memory", shuffleServiceMemoryQuantity) + .endResources() + .addNewVolumeMount() + .withName("shuffles-volume") + .withMountPath(parsedArguments.shuffleHostPathDir) + .withReadOnly(true) + .endVolumeMount() + .endContainer() + .endSpec() + .endTemplate() + .endSpec() + .done() + } + } +} + +private[spark] object StartKubernetesShuffleService { + private val SHUFFLE_SERVICE_SELECTOR = "spark-shuffle-service" + def main(args: Array[String]): Unit = { + val parsedArguments = StartKubernetesShuffleServiceArgumentsBuilder.fromArgsArray(args) + new StartKubernetesShuffleService().run(parsedArguments) + } +} diff --git a/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala b/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala new file mode 100644 index 0000000000000..0d3b97c636ca3 --- /dev/null +++ b/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.kubernetes + +import org.apache.spark.SparkContext +import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl} + +private[spark] class KubernetesClusterManager extends ExternalClusterManager { + + override def canCreate(masterURL: String): Boolean = masterURL.startsWith("kubernetes") + + override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = { + val scheduler = new TaskSchedulerImpl(sc) + sc.taskScheduler = scheduler + scheduler + } + + override def createSchedulerBackend(sc: SparkContext, masterURL: String, scheduler: TaskScheduler) + : SchedulerBackend = { + new KubernetesClusterSchedulerBackend(sc.taskScheduler.asInstanceOf[TaskSchedulerImpl], sc) + } + + override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = { + scheduler.asInstanceOf[TaskSchedulerImpl].initialize(backend) + } + +} + diff --git a/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala new file mode 100644 index 0000000000000..fafaa807cbd43 --- /dev/null +++ b/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -0,0 +1,593 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.kubernetes + +import java.io.{File, FileInputStream} +import java.util.UUID +import java.util.concurrent.Executors +import java.util.concurrent.atomic.AtomicInteger + +import com.google.common.base.Charsets +import com.google.common.util.concurrent.ThreadFactoryBuilder +import io.fabric8.kubernetes.api.model._ +import io.fabric8.kubernetes.api.model.extensions.DaemonSet +import org.apache.commons.codec.binary.Base64 +import org.apache.spark.network.netty.SparkTransportConf +import org.apache.spark.network.sasl.SecretKeyHolder +import org.apache.spark.network.shuffle.kubernetes.KubernetesExternalShuffleClient +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.concurrent.{ExecutionContext, Future} + +import org.apache.spark.{SecurityManager, SparkContext, SparkException} +import org.apache.spark.deploy.kubernetes.KubernetesClientBuilder +import org.apache.spark.rpc.{RpcCallContext, RpcEndpointAddress, RpcEnv} +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RetrieveSparkProps +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend +import org.apache.spark.scheduler.TaskSchedulerImpl +import org.apache.spark.util.Utils + +private[spark] class KubernetesClusterSchedulerBackend( + scheduler: TaskSchedulerImpl, + val sc: SparkContext) + extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) { + + import KubernetesClusterSchedulerBackend._ + + private val EXECUTOR_MODIFICATION_LOCK = new Object + private val runningExecutorPods = new mutable.HashMap[String, Pod] + + private val kubernetesMaster = conf + .getOption("spark.kubernetes.master") + .getOrElse( + throw new SparkException("Kubernetes master must be specified in kubernetes mode.")) + + private val executorDockerImage = conf + .get("spark.kubernetes.executor.docker.image", s"spark-executor:${sc.version}") + + private val kubernetesNamespace = conf + .getOption("spark.kubernetes.namespace") + .getOrElse( + throw new SparkException("Kubernetes namespace must be specified in kubernetes mode.")) + + private val executorPort = conf.get("spark.executor.port", DEFAULT_STATIC_PORT.toString).toInt + + /** + * Allows for specifying a custom replication controller for the executor runtime. This should + * only be used if the user really knows what they are doing. Allows for custom behavior on the + * executors themselves, or for loading extra containers into the executor pods. + */ + private val executorCustomSpecFile = conf.getOption("spark.kubernetes.executor.custom.spec.file") + private val executorCustomSpecExecutorContainerName = executorCustomSpecFile.map(_ => + conf + .getOption("spark.kubernetes.executor.custom.spec.container.name") + .getOrElse(throw new SparkException("When using a custom replication controller spec" + + " for executors, the name of the container that the executor will run in must be" + + " specified via spark.kubernetes.executor.custom.spec.container.name"))) + + private val blockmanagerPort = conf + .get("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT.toString) + .toInt + + private val kubernetesDriverServiceName = conf + .getOption("spark.kubernetes.driver.service.name") + .getOrElse( + throw new SparkException("Must specify the service name the driver is running with")) + + private val executorMemory = conf.getOption("spark.executor.memory").getOrElse("1g") + private val executorMemoryBytes = Utils.byteStringAsBytes(executorMemory) + + private val memoryOverheadBytes = conf + .getOption("spark.kubernetes.executor.memoryOverhead") + .map(overhead => Utils.byteStringAsBytes(overhead)) + .getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryBytes).toInt, + MEMORY_OVERHEAD_MIN)) + private val executorMemoryWithOverhead = executorMemoryBytes + memoryOverheadBytes + + private val executorCores = conf.getOption("spark.executor.cores").getOrElse("1") + + private implicit val requestExecutorContext = ExecutionContext.fromExecutorService( + Executors.newCachedThreadPool( + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("kubernetes-executor-requests-%d") + .build)) + + private val kubernetesClient = KubernetesClientBuilder + .buildFromWithinPod(kubernetesMaster, kubernetesNamespace) + + private val externalShuffleServiceEnabled = conf.getBoolean( + "spark.shuffle.service.enabled", defaultValue = false) + + override val minRegisteredRatio = + if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) { + 0.8 + } else { + super.minRegisteredRatio + } + + protected var totalExpectedExecutors = new AtomicInteger(0) + private val maybeShuffleService = if (externalShuffleServiceEnabled) { + val daemonSetName = conf + .getOption("spark.kubernetes.shuffle.service.daemonset.name") + .getOrElse(throw new IllegalArgumentException("When using the shuffle" + + " service, must specify the shuffle service daemon set name.") + ) + val daemonSetNamespace = conf + .getOption("spark.kubernetes.shuffle.service.daemonset.namespace") + .getOrElse(throw new IllegalArgumentException("When using the shuffle service," + + " must specify the shuffle service daemon set namespace.")) + Some(ShuffleServiceDaemonSetMetadata(daemonSetName, daemonSetNamespace)) + } else { + Option.empty[ShuffleServiceDaemonSetMetadata] + } + + private val driverUrl = RpcEndpointAddress( + System.getenv(s"${convertToEnvMode(kubernetesDriverServiceName)}_SERVICE_HOST"), + sc.getConf.get("spark.driver.port").toInt, + CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString + + private def convertToEnvMode(value: String): String = + value.toUpperCase.map { c => if (c == '-') '_' else c } + + private val initialExecutors = getInitialTargetExecutorNumber(1) + private val authEnabled = conf.getBoolean("spark.authenticate", defaultValue = false) + private val authSecret = if (authEnabled) { + conf.getOption("spark.authenticate.secret") + .getOrElse( + throw new IllegalArgumentException("No secret provided though spark.authenticate is true.")) + } else { + "unused" + } + + private val kubernetesSecretName = s"spark-secret-${applicationId()}" + + override def sufficientResourcesRegistered(): Boolean = { + totalRegisteredExecutors.get() >= initialExecutors * minRegisteredRatio + } + + override def start(): Unit = { + super.start() + setupAuth() + if (!Utils.isDynamicAllocationEnabled(sc.conf)) { + doRequestTotalExecutors(initialExecutors) + } + } + + private def setupAuth(): Unit = { + if (authEnabled) { + val baseSecret = new SecretBuilder() + .withNewMetadata() + .withName(kubernetesSecretName) + .withNamespace(kubernetesNamespace) + .endMetadata() + .withData(Map(SHUFFLE_SECRET_NAME -> + Base64.encodeBase64String(authSecret.getBytes(Charsets.UTF_8))).asJava) + .build() + kubernetesClient.secrets().create(baseSecret) + maybeShuffleService.foreach(service => { + if (service.daemonSetNamespace != kubernetesNamespace) { + val shuffleServiceNamespaceSecret = new SecretBuilder(baseSecret) + .editMetadata() + .withNamespace(service.daemonSetNamespace) + .endMetadata() + .build() + kubernetesClient + .secrets() + .inNamespace(service.daemonSetNamespace) + .create(shuffleServiceNamespaceSecret) + } + }) + } + } + + private def allocateNewExecutorPod(): (String, Pod) = { + val executorId = UUID.randomUUID().toString.replaceAll("-", "") + val name = s"exec$executorId" + val selectors = Map(SPARK_EXECUTOR_SELECTOR -> executorId, + SPARK_APP_SELECTOR -> applicationId()).asJava + val executorMemoryQuantity = new QuantityBuilder(false) + .withAmount(executorMemoryBytes.toString) + .build() + val executorMemoryLimitQuantity = new QuantityBuilder(false) + .withAmount(executorMemoryWithOverhead.toString) + .build() + val requiredEnv = new ArrayBuffer[EnvVar] + requiredEnv += new EnvVarBuilder() + .withName("SPARK_EXECUTOR_PORT") + .withValue(executorPort.toString) + .build() + requiredEnv += new EnvVarBuilder() + .withName("SPARK_DRIVER_URL") + .withValue(driverUrl) + .build() + requiredEnv += new EnvVarBuilder() + .withName("SPARK_EXECUTOR_CORES") + .withValue(executorCores) + .build() + requiredEnv += new EnvVarBuilder() + .withName("SPARK_EXECUTOR_MEMORY") + .withValue(executorMemory) + .build() + requiredEnv += new EnvVarBuilder() + .withName("SPARK_APPLICATION_ID") + .withValue(applicationId()) + .build() + requiredEnv += new EnvVarBuilder() + .withName("SPARK_EXECUTOR_ID") + .withValue(executorId) + .build() + requiredEnv += new EnvVarBuilder() + .withName("SPARK_AUTH_ENABLED") + .withValue(authEnabled.toString) + .build() + + if (authEnabled) { + requiredEnv += new EnvVarBuilder() + .withName(SecurityManager.ENV_AUTH_SECRET) + .withNewValueFrom() + .withNewSecretKeyRef(SHUFFLE_SECRET_NAME, kubernetesSecretName) + .endValueFrom() + .build() + } + + val shuffleServiceVolume = maybeShuffleService.map(service => { + val shuffleServiceDaemonSet = getShuffleServiceDaemonSet(service) + val shuffleDir = shuffleServiceDaemonSet + .getSpec + .getTemplate + .getSpec + .getVolumes + .asScala + .find(_.getName == "shuffles-volume") + .getOrElse(throw new IllegalStateException("Expected to find a host path" + + " shuffles-service volume.")) + new VolumeBuilder() + .withName("shuffles-volume") + .withNewHostPath().withPath(shuffleDir.getHostPath.getPath).endHostPath() + .build() + }) + + shuffleServiceVolume.foreach(volume => { + requiredEnv += new EnvVarBuilder() + .withName("SPARK_LOCAL_DIRS") + .withValue(volume.getHostPath.getPath) + .build() + }) + + val requiredPorts = new ArrayBuffer[ContainerPort] + requiredPorts += new ContainerPortBuilder() + .withName(EXECUTOR_PORT_NAME) + .withContainerPort(executorPort) + .build() + requiredPorts += new ContainerPortBuilder() + .withName(BLOCK_MANAGER_PORT_NAME) + .withContainerPort(blockmanagerPort) + .build() + executorCustomSpecFile match { + case Some(filePath) => + val file = new File(filePath) + if (!file.exists()) { + throw new SparkException(s"Custom executor spec file not found at $filePath") + } + val providedPodSpec = Utils.tryWithResource(new FileInputStream(file)) { is => + kubernetesClient.pods().load(is) + } + val resolvedContainers = providedPodSpec.get.getSpec.getContainers.asScala + var foundExecutorContainer = false + for (container <- resolvedContainers) { + if (container.getName == executorCustomSpecExecutorContainerName.get) { + foundExecutorContainer = true + + val resolvedEnv = new ArrayBuffer[EnvVar] + resolvedEnv ++= container.getEnv.asScala + resolvedEnv ++= requiredEnv + container.setEnv(resolvedEnv.asJava) + + val resolvedPorts = new ArrayBuffer[ContainerPort] + resolvedPorts ++= container.getPorts.asScala + resolvedPorts ++= requiredPorts + container.setPorts(resolvedPorts.asJava) + + val resolvedVolumeMounts = new ArrayBuffer[VolumeMount] + resolvedVolumeMounts ++= container.getVolumeMounts.asScala + shuffleServiceVolume.foreach(volume => { + resolvedVolumeMounts += new VolumeMountBuilder() + .withMountPath(volume.getHostPath.getPath) + .withName(volume.getName) + .build() + }) + } + } + val providedVolumes = providedPodSpec.get.getSpec.getVolumes.asScala + val resolvedVolumes = shuffleServiceVolume.map(volume => { + Seq(volume) ++ providedVolumes + }).getOrElse(providedVolumes) + + if (!foundExecutorContainer) { + throw new SparkException("Expected container" + + s" ${executorCustomSpecExecutorContainerName.get}" + + " to be provided as the executor container in the custom" + + " executor replication controller, but it was not found in" + + " the provided spec file.") + } + val editedPod = new PodBuilder(providedPodSpec.get()) + .editMetadata() + .withName(name) + .addToLabels(selectors) + .endMetadata() + .editSpec() + .withContainers(resolvedContainers.asJava) + .withVolumes(resolvedVolumes.asJava) + .endSpec() + .build() + (executorId, kubernetesClient.pods().create(editedPod)) + case None => + (executorId, kubernetesClient.pods().createNew() + .withNewMetadata() + .withName(name) + .withLabels(selectors) + .endMetadata() + .withNewSpec() + .withVolumes(shuffleServiceVolume.map(Seq(_)).getOrElse(Seq[Volume]()).asJava) + .addNewContainer() + .withName(s"exec-${applicationId()}-container") + .withImage(executorDockerImage) + .withImagePullPolicy("IfNotPresent") + .withVolumeMounts(shuffleServiceVolume.map(volume => { + Seq(new VolumeMountBuilder() + .withName(volume.getName) + .withMountPath(volume.getHostPath.getPath) + .build()) + }).getOrElse(Seq[VolumeMount]()).asJava) + .withNewResources() + .addToRequests("memory", executorMemoryQuantity) + .addToLimits("memory", executorMemoryLimitQuantity) + .endResources() + .withEnv(requiredEnv.asJava) + .withPorts(requiredPorts.asJava) + .endContainer() + .endSpec() + .done()) + } + } + + override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = Future[Boolean] { + EXECUTOR_MODIFICATION_LOCK.synchronized { + if (requestedTotal > totalExpectedExecutors.get) { + logInfo(s"Requesting ${requestedTotal - totalExpectedExecutors.get}" + + s" additional executors, expecting total $requestedTotal and currently" + + s" expected ${totalExpectedExecutors.get}") + for (i <- 0 until (requestedTotal - totalExpectedExecutors.get)) { + runningExecutorPods += allocateNewExecutorPod() + } + } + totalExpectedExecutors.set(requestedTotal) + } + true + } + + override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = Future[Boolean] { + EXECUTOR_MODIFICATION_LOCK.synchronized { + for (executor <- executorIds) { + runningExecutorPods.remove(executor) match { + case Some(pod) => kubernetesClient.pods().delete(pod) + case None => logWarning(s"Unable to remove pod for unknown executor $executor") + } + } + } + true + } + + private def getInitialTargetExecutorNumber(defaultNumExecutors: Int = 1): Int = { + if (Utils.isDynamicAllocationEnabled(conf)) { + val minNumExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", 0) + val initialNumExecutors = Utils.getDynamicAllocationInitialExecutors(conf) + val maxNumExecutors = conf.getInt("spark.dynamicAllocation.maxExecutors", 1) + require(initialNumExecutors >= minNumExecutors && initialNumExecutors <= maxNumExecutors, + s"initial executor number $initialNumExecutors must between min executor number " + + s"$minNumExecutors and max executor number $maxNumExecutors") + + initialNumExecutors + } else { + conf.getInt("spark.executor.instances", defaultNumExecutors) + } + } + + override def stop(): Unit = { + // TODO investigate why Utils.tryLogNonFatalError() doesn't work in this context. + // When using Utils.tryLogNonFatalError some of the code fails but without any logs or + // indication as to why. + try { + runningExecutorPods.values.foreach(kubernetesClient.pods().delete(_)) + } catch { + case e: Throwable => logError("Uncaught exception while shutting down controllers.", e) + } + try { + kubernetesClient.services().withName(kubernetesDriverServiceName).delete() + } catch { + case e: Throwable => logError("Uncaught exception while shutting down driver service.", e) + } + if (authEnabled) { + try { + kubernetesClient.secrets().withName(kubernetesSecretName).delete() + } catch { + case e: Throwable => logError("Uncaught exception while delete app secret.", e) + } + maybeShuffleService.foreach(service => { + try { + sendApplicationCompleteToShuffleService(service) + } catch { + case e: Throwable => logError("Uncaught exception while cleaning up" + + " shuffle service secret.", e) + } + }) + } + try { + kubernetesClient.close() + } catch { + case e: Throwable => logError("Uncaught exception closing Kubernetes client.", e) + } + super.stop() + } + + private def sendApplicationCompleteToShuffleService( + service: ShuffleServiceDaemonSetMetadata): Unit = { + if (service.daemonSetNamespace != kubernetesNamespace) { + logInfo("Sending application complete message to shuffle service.") + val serviceDaemonSet = kubernetesClient + .inNamespace(service.daemonSetNamespace) + .extensions() + .daemonSets() + .withName(service.daemonSetName) + .get + val podLabels = serviceDaemonSet.getSpec.getSelector.getMatchLabels + // An interesting note is that it would be preferable to put the shuffle service pods + // behind a K8s service and just query the service to pick a pod for us. Unfortunately, + // services wouldn't work well with SASL authentication, since SASL requires a series + // of exchanged messages to be sent between the pods, but repeated messages sent to the + // same service may resolve to different pods and corrupt the handshake process. + val shuffleServicePods = kubernetesClient + .inNamespace(service.daemonSetNamespace) + .pods() + .withLabels(podLabels) + .list() + .getItems + .asScala + val securityManager = sc.env.securityManager + val port = conf.getInt("spark.shuffle.service.port", 7337) + var success = false + val shuffleClient = new KubernetesExternalShuffleClient( + SparkTransportConf.fromSparkConf(conf, "shuffle"), + securityManager, + securityManager.isAuthenticationEnabled(), + securityManager.isSaslEncryptionEnabled()) + try { + shuffleClient.init(applicationId()) + for (pod <- shuffleServicePods) { + if (!success) { + val host = pod.getStatus.getPodIP + logInfo(s"Sending application complete message to $host") + try { + shuffleClient.sendApplicationComplete(host, port) + success = true + logInfo("Successfully sent application complete message.") + } catch { + case e: Throwable => logError(s"Failed to send application complete to" + + s" $host:$port. Will try another pod if possible.", e) + } + } + } + } finally { + shuffleClient.close() + } + if (!success) { + throw new IllegalStateException("Failed to send application complete" + + " message to any shuffle service pod.") + } + } + } + + override def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = { + new KubernetesDriverEndpoint(rpcEnv, properties) + } + + private class KubernetesDriverEndpoint(override val rpcEnv: RpcEnv, + sparkProperties: Seq[(String, String)]) + extends DriverEndpoint(rpcEnv, sparkProperties) { + + override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { + new PartialFunction[Any, Unit]() { + override def isDefinedAt(x: Any): Boolean = { + x match { + case RetrieveSparkProps(executorHostname) => true + case _ => false + } + } + + override def apply(v1: Any): Unit = { + v1 match { + case RetrieveSparkProps(executorId) => + EXECUTOR_MODIFICATION_LOCK.synchronized { + var resolvedProperties = sparkProperties + maybeShuffleService.foreach(service => { + // Refresh the pod so we get the status, particularly what host it's running on + val runningExecutorPod = kubernetesClient + .pods() + .withName(runningExecutorPods(executorId).getMetadata.getName) + .get() + val shuffleServiceDaemonSet = kubernetesClient + .extensions() + .daemonSets() + .inNamespace(service.daemonSetNamespace) + .withName(service.daemonSetName) + .get() + val shuffleServiceForPod = kubernetesClient + .inNamespace(service.daemonSetNamespace) + .pods() + .inNamespace(service.daemonSetNamespace) + .withLabels(shuffleServiceDaemonSet.getSpec.getSelector.getMatchLabels) + .list() + .getItems + .asScala + .filter(_.getStatus.getHostIP == runningExecutorPod.getStatus.getHostIP) + .head + resolvedProperties = resolvedProperties ++ Seq( + ("spark.shuffle.service.host", shuffleServiceForPod.getStatus.getPodIP)) + }) + if (authEnabled) { + // Don't pass the secret here, it's been set in the executor environment + // already. + resolvedProperties = resolvedProperties + .filterNot(_._1 == "spark.authenticate.secret") + .filterNot(_._1 == "spark.authenticate") + } + context.reply(resolvedProperties) + } + } + } + }.orElse(super.receiveAndReply(context)) + } + } + + private def getShuffleServiceDaemonSet(serviceMetadata: ShuffleServiceDaemonSetMetadata) + : DaemonSet = { + kubernetesClient + .extensions() + .daemonSets() + .inNamespace(serviceMetadata.daemonSetNamespace) + .withName(serviceMetadata.daemonSetName) + .get + } +} + +private object KubernetesClusterSchedulerBackend { + private val SPARK_EXECUTOR_SELECTOR = "spark-exec" + private val SPARK_APP_SELECTOR = "spark-app" + private val DEFAULT_STATIC_PORT = 10000 + private val DEFAULT_BLOCKMANAGER_PORT = 7079 + private val BLOCK_MANAGER_PORT_NAME = "blockmanager" + private val EXECUTOR_PORT_NAME = "executor" + private val MEMORY_OVERHEAD_FACTOR = 0.10 + private val MEMORY_OVERHEAD_MIN = 384L + private val SHUFFLE_SECRET_NAME = "spark-shuffle-secret" +} + +private case class ShuffleServiceDaemonSetMetadata( + val daemonSetName: String, + val daemonSetNamespace: String) diff --git a/kubernetes/docker-minimal-bundle/pom.xml b/kubernetes/docker-minimal-bundle/pom.xml new file mode 100644 index 0000000000000..ff8e01d4ed34d --- /dev/null +++ b/kubernetes/docker-minimal-bundle/pom.xml @@ -0,0 +1,150 @@ + + + + + 4.0.0 + + org.apache.spark + spark-parent_2.11 + 2.1.0-SNAPSHOT + ../pom.xml + + + spark-docker-minimal-bundle_2.11 + Spark Project Docker Minimal Bundle + http://spark.apache.org/ + pom + + + docker-minimal-bundle + none + pre-integration-test + + + + + org.apache.spark + spark-assembly_${scala.binary.version} + ${project.version} + pom + + + + com.google.guava + guava + ${hadoop.deps.scope} + + + + + org.apache.spark + spark-kubernetes_${scala.binary.version} + ${project.version} + + + + + + org.apache.maven.plugins + maven-assembly-plugin + + + driver-docker-dist + pre-integration-test + + single + + + + src/main/assembly/driver-assembly.xml + + posix + + + + executor-docker-dist + pre-integration-test + + single + + + + src/main/assembly/executor-assembly.xml + + posix + + + + shuffle-service-docker-dist + pre-integration-test + + single + + + + src/main/assembly/shuffle-service-assembly.xml + + posix + + + + + + + + + + + hive + + + org.apache.spark + spark-hive_${scala.binary.version} + ${project.version} + + + + + hive-thriftserver + + + org.apache.spark + spark-hive-thriftserver_${scala.binary.version} + ${project.version} + + + + + spark-ganglia-lgpl + + + org.apache.spark + spark-ganglia-lgpl_${scala.binary.version} + ${project.version} + + + + + diff --git a/kubernetes/docker-minimal-bundle/src/main/assembly/driver-assembly.xml b/kubernetes/docker-minimal-bundle/src/main/assembly/driver-assembly.xml new file mode 100644 index 0000000000000..145244f34d1d9 --- /dev/null +++ b/kubernetes/docker-minimal-bundle/src/main/assembly/driver-assembly.xml @@ -0,0 +1,84 @@ + + + driver-docker-dist + + tar.gz + dir + + false + + + + ${project.parent.basedir}/core/src/main/resources/org/apache/spark/ui/static/ + + ui-resources/org/apache/spark/ui/static + + **/* + + + + + ${project.parent.basedir}/sbin/ + + sbin + + **/* + + + + + ${project.parent.basedir}/bin/ + + bin + + **/* + + + + + ${project.parent.basedir}/conf/ + + conf + + **/* + + + + + src/main/docker/driver + + + + **/* + + + + + + jars + true + false + runtime + false + + org.apache.spark:spark-assembly_${scala.binary.version}:pom + org.spark-project.spark:unused + + + + diff --git a/kubernetes/docker-minimal-bundle/src/main/assembly/executor-assembly.xml b/kubernetes/docker-minimal-bundle/src/main/assembly/executor-assembly.xml new file mode 100644 index 0000000000000..d97ba56562a12 --- /dev/null +++ b/kubernetes/docker-minimal-bundle/src/main/assembly/executor-assembly.xml @@ -0,0 +1,84 @@ + + + executor-docker-dist + + tar.gz + dir + + false + + + + ${project.parent.basedir}/core/src/main/resources/org/apache/spark/ui/static/ + + ui-resources/org/apache/spark/ui/static + + **/* + + + + + ${project.parent.basedir}/sbin/ + + sbin + + **/* + + + + + ${project.parent.basedir}/bin/ + + bin + + **/* + + + + + ${project.parent.basedir}/conf/ + + conf + + **/* + + + + + src/main/docker/executor + + + + **/* + + + + + + jars + true + false + runtime + false + + org.apache.spark:spark-assembly_${scala.binary.version}:pom + org.spark-project.spark:unused + + + + diff --git a/kubernetes/docker-minimal-bundle/src/main/assembly/shuffle-service-assembly.xml b/kubernetes/docker-minimal-bundle/src/main/assembly/shuffle-service-assembly.xml new file mode 100644 index 0000000000000..2455d9ced926e --- /dev/null +++ b/kubernetes/docker-minimal-bundle/src/main/assembly/shuffle-service-assembly.xml @@ -0,0 +1,84 @@ + + + shuffle-service-docker-dist + + tar.gz + dir + + false + + + + ${project.parent.basedir}/core/src/main/resources/org/apache/spark/ui/static/ + + ui-resources/org/apache/spark/ui/static + + **/* + + + + + ${project.parent.basedir}/sbin/ + + sbin + + **/* + + + + + ${project.parent.basedir}/bin/ + + bin + + **/* + + + + + ${project.parent.basedir}/conf/ + + conf + + **/* + + + + + src/main/docker/shuffle-service + + + + **/* + + + + + + jars + true + false + runtime + false + + org.apache.spark:spark-assembly_${scala.binary.version}:pom + org.spark-project.spark:unused + + + + diff --git a/kubernetes/docker-minimal-bundle/src/main/docker/driver/Dockerfile b/kubernetes/docker-minimal-bundle/src/main/docker/driver/Dockerfile new file mode 100644 index 0000000000000..bd29102dff0f3 --- /dev/null +++ b/kubernetes/docker-minimal-bundle/src/main/docker/driver/Dockerfile @@ -0,0 +1,25 @@ +FROM ubuntu:trusty + +# Upgrade package index +# install a few other useful packages plus Open Jdk 7 +# Remove unneeded /var/lib/apt/lists/* after install to reduce the +# docker image size (by ~30MB) +RUN apt-get update && \ + apt-get install -y less openjdk-7-jre-headless net-tools vim-tiny sudo openssh-server procps && \ + rm -rf /var/lib/apt/lists/* + +RUN mkdir -p /opt/spark +RUN mkdir -p /opt/spark/ui-resources/org/apache/spark/ui/static +RUN touch /opt/spark/RELEASE + +ADD jars /opt/spark/jars +ADD bin /opt/spark/bin +ADD sbin /opt/spark/sbin +ADD conf /opt/spark/conf + +ENV SPARK_HOME /opt/spark +ENV JAVA_HOME /usr/lib/jvm/java-7-openjdk-amd64/jre + +WORKDIR /opt/spark + +CMD exec ${JAVA_HOME}/bin/java -cp /opt/spark/jars/\* org.apache.spark.deploy.kubernetes.driverlauncher.KubernetesDriverLauncherServer --driver-launcher-service-name $SPARK_DRIVER_LAUNCHER_SERVICE_NAME --application-secret-env $SPARK_DRIVER_LAUNCHER_APP_SECRET_ENV_NAME --driver-launcher-port $SPARK_DRIVER_LAUNCHER_SERVER_PORT diff --git a/kubernetes/docker-minimal-bundle/src/main/docker/executor/Dockerfile b/kubernetes/docker-minimal-bundle/src/main/docker/executor/Dockerfile new file mode 100644 index 0000000000000..45a0710fbf019 --- /dev/null +++ b/kubernetes/docker-minimal-bundle/src/main/docker/executor/Dockerfile @@ -0,0 +1,26 @@ +FROM ubuntu:trusty + +# Upgrade package index +# install a few other useful packages plus Open Jdk 7 +# Remove unneeded /var/lib/apt/lists/* after install to reduce the +# docker image size (by ~30MB) +RUN apt-get update && \ + apt-get install -y less openjdk-7-jre-headless net-tools vim-tiny sudo openssh-server procps && \ + rm -rf /var/lib/apt/lists/* + +RUN mkdir -p /opt/spark +RUN mkdir -p /opt/spark/ui-resources/org/apache/spark/ui/static +RUN touch /opt/spark/RELEASE + +ADD jars /opt/spark/jars +ADD bin /opt/spark/bin +ADD sbin /opt/spark/sbin +ADD conf /opt/spark/conf + +ENV SPARK_HOME /opt/spark +ENV JAVA_HOME /usr/lib/jvm/java-7-openjdk-amd64/jre + +WORKDIR /opt/spark + +# TODO support spark.executor.extraClassPath +CMD exec ${JAVA_HOME}/bin/java -Dspark.executor.port=$SPARK_EXECUTOR_PORT -Dspark.authenticate=$SPARK_AUTH_ENABLED -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp ${SPARK_HOME}/jars/\* org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $HOSTNAME diff --git a/kubernetes/docker-minimal-bundle/src/main/docker/shuffle-service/Dockerfile b/kubernetes/docker-minimal-bundle/src/main/docker/shuffle-service/Dockerfile new file mode 100644 index 0000000000000..067bef149dde3 --- /dev/null +++ b/kubernetes/docker-minimal-bundle/src/main/docker/shuffle-service/Dockerfile @@ -0,0 +1,25 @@ +FROM ubuntu:trusty + +# Upgrade package index +# install a few other useful packages plus Open Jdk 7 +# Remove unneeded /var/lib/apt/lists/* after install to reduce the +# docker image size (by ~30MB) +RUN apt-get update && \ + apt-get install -y less openjdk-7-jre-headless net-tools vim-tiny sudo openssh-server procps && \ + rm -rf /var/lib/apt/lists/* + +RUN mkdir -p /opt/spark +RUN mkdir -p /opt/spark/ui-resources/org/apache/spark/ui/static +RUN touch /opt/spark/RELEASE + +ADD jars /opt/spark/jars +ADD bin /opt/spark/bin +ADD sbin /opt/spark/sbin +ADD conf /opt/spark/conf + +ENV SPARK_HOME /opt/spark +ENV JAVA_HOME /usr/lib/jvm/java-7-openjdk-amd64/jre + +WORKDIR /opt/spark + +CMD exec ${JAVA_HOME}/bin/java -Dspark.shuffle.service.port=$SPARK_SHUFFLE_SERVICE_PORT -Dspark.authenticate=$SPARK_AUTH_ENABLED -Xms$SPARK_SHUFFLE_SERVICE_MEMORY -Xmx$SPARK_SHUFFLE_SERVICE_MEMORY -cp ${SPARK_HOME}/jars/\* org.apache.spark.deploy.kubernetes.shuffle.KubernetesShuffleService --shuffle-service-secrets-namespace $SHUFFLE_SERVICE_SECRETS_NAMESPACE --kubernetes-master $KUBERNETES_MASTER diff --git a/kubernetes/integration-tests-spark-jobs/pom.xml b/kubernetes/integration-tests-spark-jobs/pom.xml new file mode 100644 index 0000000000000..17f1c4906214f --- /dev/null +++ b/kubernetes/integration-tests-spark-jobs/pom.xml @@ -0,0 +1,45 @@ + + + + 4.0.0 + + org.apache.spark + spark-parent_2.11 + 2.1.0-SNAPSHOT + ../../pom.xml + + + spark-kubernetes-integration-tests-spark-jobs_2.11 + jar + Spark Project Kubernetes Integration Tests Spark Jobs + + + + org.apache.spark + spark-core_${scala.binary.version} + ${project.version} + provided + + + org.apache.spark + spark-sql_${scala.binary.version} + ${project.version} + provided + + + diff --git a/kubernetes/integration-tests-spark-jobs/src/main/scala/org/apache/spark/deploy/kubernetes/integrationtest/jobs/SparkPiWithInfiniteWait.scala b/kubernetes/integration-tests-spark-jobs/src/main/scala/org/apache/spark/deploy/kubernetes/integrationtest/jobs/SparkPiWithInfiniteWait.scala new file mode 100644 index 0000000000000..6e4660b771305 --- /dev/null +++ b/kubernetes/integration-tests-spark-jobs/src/main/scala/org/apache/spark/deploy/kubernetes/integrationtest/jobs/SparkPiWithInfiniteWait.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes.integrationtest.jobs + +import scala.math.random + +import org.apache.spark.sql.SparkSession + +// Equivalent to SparkPi except does not stop the Spark Context +// at the end and spins forever, so other things can inspect the +// Spark UI immediately after the fact. +private[spark] object SparkPiWithInfiniteWait { + + def main(args: Array[String]): Unit = { + val spark = SparkSession + .builder + .appName("Spark Pi") + .getOrCreate() + val slices = if (args.length > 0) args(0).toInt else 10 + val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow + val count = spark.sparkContext.parallelize(1 until n, slices).map { i => + val x = random * 2 - 1 + val y = random * 2 - 1 + if (x*x + y*y < 1) 1 else 0 + }.reduce(_ + _) + // scalastyle:off println + println("Pi is roughly " + 4.0 * count / (n - 1)) + // scalastyle:on println + + // Spin forever to keep the Spark UI active, so other things can inspect the job. + while (true) { + Thread.sleep(600000) + } + } + +} diff --git a/kubernetes/integration-tests/pom.xml b/kubernetes/integration-tests/pom.xml new file mode 100644 index 0000000000000..470db9c7d3ea4 --- /dev/null +++ b/kubernetes/integration-tests/pom.xml @@ -0,0 +1,226 @@ + + + + 4.0.0 + + org.apache.spark + spark-parent_2.11 + 2.1.0-SNAPSHOT + ../../pom.xml + + + spark-kubernetes-integration-tests_2.11 + jar + Spark Project Kubernetes Integration Tests + + + + org.apache.spark + spark-kubernetes_${scala.binary.version} + ${project.version} + test + + + org.apache.spark + spark-core_${scala.binary.version} + ${project.version} + test-jar + test + + + org.apache.spark + spark-kubernetes-integration-tests-spark-jobs_${scala.binary.version} + ${project.version} + test + + + org.apache.spark + spark-docker-minimal-bundle_${scala.binary.version} + ${project.version} + tar.gz + driver-docker-dist + test + + + * + * + + + + + com.google.guava + guava + test + + 18.0 + + + com.spotify + docker-client + test + + + + com.fasterxml.jackson.jaxrs + jackson-jaxrs-json-provider + + + com.fasterxml.jackson.core + jackson-databind + + + org.glassfish.jersey.core + jersey-client + + + org.glassfish.jersey.core + jersey-common + + + javax.ws.rs + jsr311-api + + + + + + + + + org.apache.maven.plugins + maven-dependency-plugin + + + copy-test-spark-jobs + pre-integration-test + + copy + + + + + org.apache.spark + spark-kubernetes-integration-tests-spark-jobs_${scala.binary.version} + ${project.version} + jar + ${project.build.directory}/integration-tests-spark-jobs + + + + + + unpack-docker-driver-bundle + pre-integration-test + + unpack + + + + + org.apache.spark + spark-docker-minimal-bundle_${scala.binary.version} + ${project.version} + driver-docker-dist + tar.gz + true + ${project.build.directory}/docker/driver + + + + + + unpack-docker-executor-bundle + pre-integration-test + + unpack + + + + + org.apache.spark + spark-docker-minimal-bundle_${scala.binary.version} + ${project.version} + executor-docker-dist + tar.gz + true + ${project.build.directory}/docker/executor + + + + + + unpack-docker-shuffle-service-bundle + pre-integration-test + + unpack + + + + + org.apache.spark + spark-docker-minimal-bundle_${scala.binary.version} + ${project.version} + shuffle-service-docker-dist + tar.gz + true + ${project.build.directory}/docker/shuffle-service + + + + + + + + com.googlecode.maven-download-plugin + download-maven-plugin + 1.3.0 + + + download-minikube-linux + pre-integration-test + + wget + + + https://storage.googleapis.com/minikube/releases/v0.12.2/minikube-linux-amd64 + ${project.build.directory}/minikube-bin/linux-amd64 + minikube + + + + download-minikube-darwin + pre-integration-test + + wget + + + https://storage.googleapis.com/minikube/releases/v0.12.2/minikube-darwin-amd64 + ${project.build.directory}/minikube-bin/darwin-amd64 + minikube + + + + + + + + + diff --git a/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala b/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala new file mode 100644 index 0000000000000..d8e6b09ccc267 --- /dev/null +++ b/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala @@ -0,0 +1,350 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes.integrationtest + +import java.io.File +import java.nio.file.Paths +import java.util.UUID + +import com.google.common.base.Charsets +import com.google.common.collect.ImmutableList +import com.google.common.io.Files +import io.fabric8.kubernetes.api.model.Pod +import io.fabric8.kubernetes.client.{Config, KubernetesClient} +import org.apache.commons.io.FileUtils +import org.scalatest.BeforeAndAfter +import org.scalatest.concurrent.{Eventually, PatienceConfiguration} +import org.scalatest.time.{Minutes, Seconds, Span} +import scala.collection.JavaConverters._ + +import org.apache.spark.deploy.kubernetes.{Client, ClientArguments} +import org.apache.spark.deploy.kubernetes.integrationtest.docker.SparkDockerImageBuilder +import org.apache.spark.deploy.kubernetes.integrationtest.minikube.Minikube +import org.apache.spark.deploy.kubernetes.integrationtest.restapis.SparkRestApiV1 +import org.apache.spark.deploy.kubernetes.shuffle.{StartKubernetesShuffleService, StartKubernetesShuffleServiceArgumentsBuilder} +import org.apache.spark.status.api.v1.{ApplicationStatus, StageStatus} +import org.apache.spark.SparkFunSuite + +private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { + + private val EXAMPLES_JAR = Paths.get("target", "integration-tests-spark-jobs") + .toFile + .listFiles()(0) + .getAbsolutePath + + private val TIMEOUT = PatienceConfiguration.Timeout(Span(2, Minutes)) + private val INTERVAL = PatienceConfiguration.Interval(Span(2, Seconds)) + private val MAIN_CLASS = "org.apache.spark.deploy.kubernetes" + + ".integrationtest.jobs.SparkPiWithInfiniteWait" + private val NAMESPACE = UUID.randomUUID().toString.replaceAll("-", "") + private val SHUFFLE_SERVICE_NAMESPACE = UUID.randomUUID().toString.replaceAll("-", "") + private val SHUFFLE_SERVICE_NAME = "spark-shuffle-service" + private var minikubeKubernetesClient: KubernetesClient = _ + private var clientConfig: Config = _ + + override def beforeAll(): Unit = { + Minikube.startMinikube() + new SparkDockerImageBuilder(Minikube.getDockerEnv).buildSparkDockerImages() + Minikube.getKubernetesClient.namespaces.createNew() + .withNewMetadata() + .withName(NAMESPACE) + .endMetadata() + .done() + minikubeKubernetesClient = Minikube.getKubernetesClient.inNamespace(NAMESPACE) + Minikube.getKubernetesClient.namespaces.createNew() + .withNewMetadata() + .withName(SHUFFLE_SERVICE_NAMESPACE) + .endMetadata() + .done() + + Minikube.executeMinikubeSsh("mkdir -p /tmp/spark-shuffles") + clientConfig = minikubeKubernetesClient.getConfiguration + val startShuffleArgs = new StartKubernetesShuffleServiceArgumentsBuilder() + .copy(kubernetesMaster = Some(s"https://${Minikube.getMinikubeIp}:8443")) + .copy(shuffleServiceNamespace = Some(SHUFFLE_SERVICE_NAMESPACE)) + .copy(shuffleServiceDaemonSetName = SHUFFLE_SERVICE_NAME) + .copy(shuffleServiceDockerImage = "spark-shuffle-service:latest") + .copy(shuffleServiceMemory = "512m") + .copy(shuffleServiceAuthEnabled = true) + .copy(kubernetesCaCertFile = Some(clientConfig.getCaCertFile)) + .copy(kubernetesClientCertFile = Some(clientConfig.getClientCertFile)) + .copy(kubernetesClientKeyFile = Some(clientConfig.getClientKeyFile)) + .copy(shuffleHostPathDir = "/tmp/spark-shuffles") + .build() + new StartKubernetesShuffleService().run(startShuffleArgs) + } + + before { + Eventually.eventually(TIMEOUT, INTERVAL) { + assert(minikubeKubernetesClient.pods().list().getItems.isEmpty) + assert(minikubeKubernetesClient.services().list().getItems.isEmpty) + } + } + + after { + val pods = minikubeKubernetesClient.pods().list().getItems.asScala + pods.par.foreach(pod => { + minikubeKubernetesClient + .pods() + .withName(pod.getMetadata.getName) + .withGracePeriod(60) + .delete + }) + } + + override def afterAll(): Unit = { + if (!System.getProperty("spark.docker.test.persistMinikube", "false").toBoolean) { + Minikube.deleteMinikube() + } + } + + private def expectationsForStaticAllocation(sparkMetricsService: SparkRestApiV1): Unit = { + val apps = Eventually.eventually(TIMEOUT, INTERVAL) { + val result = sparkMetricsService + .getApplications(ImmutableList.of(ApplicationStatus.RUNNING, ApplicationStatus.COMPLETED)) + assert(result.size == 1 + && !result.head.id.equalsIgnoreCase("appid") + && !result.head.id.equalsIgnoreCase("{appId}")) + result + } + Eventually.eventually(TIMEOUT, INTERVAL) { + val result = sparkMetricsService.getExecutors(apps.head.id) + assert(result.size == 3) + assert(result.count(exec => exec.id != "driver") == 2) + result + } + Eventually.eventually(TIMEOUT, INTERVAL) { + val result = sparkMetricsService.getStages( + apps.head.id, Seq(StageStatus.COMPLETE).asJava) + assert(result.size == 1) + result + } + } + + test("Run a simple example") { + val args = ClientArguments.builder() + .userMainClass(MAIN_CLASS) + .addJar(s"file://$EXAMPLES_JAR") + .addSparkConf("spark.master", "kubernetes") + .addSparkConf("spark.driver.memory", "512m") + .addSparkConf("spark.executor.memory", "512m") + .addSparkConf("spark.executor.instances", "2") + .addSparkConf("spark.executor.cores", "1") + .kubernetesAppName("spark-pi") + .driverDockerImage("spark-driver:latest") + .executorDockerImage("spark-executor:latest") + .kubernetesAppNamespace(NAMESPACE) + .kubernetesMaster(s"https://${Minikube.getMinikubeIp}:8443") + .kubernetesCaCertFile(clientConfig.getCaCertFile) + .kubernetesClientCertFile(clientConfig.getClientCertFile) + .kubernetesClientKeyFile(clientConfig.getClientKeyFile) + .build() + new Client(args).run() + + val sparkMetricsService = Minikube.getService[SparkRestApiV1]( + "spark-pi", NAMESPACE, "spark-ui-port") + expectationsForStaticAllocation(sparkMetricsService) + } + + test("Dynamic allocation mode") { + val args = ClientArguments.builder() + .userMainClass(MAIN_CLASS) + .addJar(s"file://$EXAMPLES_JAR") + .addSparkConf("spark.master", "kubernetes") + .addSparkConf("spark.driver.memory", "512m") + .addSparkConf("spark.executor.memory", "512m") + .addSparkConf("spark.dynamicAllocation.enabled", "true") + .addSparkConf("spark.dynamicAllocation.minExecutors", "0") + .addSparkConf("spark.dynamicAllocation.initialExecutors", "0") + .addSparkConf("spark.dynamicAllocation.maxExecutors", "1") + .addSparkConf("spark.executor.cores", "1") + .addSparkConf("spark.shuffle.service.enabled", "true") + .addSparkConf("spark.dynamicAllocation.executorIdleTimeout", "20s") + .addSparkConf("spark.memory.storageFraction", "1.0") + .addSparkConf("spark.authenticate", "true") + .kubernetesAppName("spark-pi-dyn") + .driverDockerImage("spark-driver:latest") + .executorDockerImage("spark-executor:latest") + .addSparkConf("spark.kubernetes.shuffle.service.daemonset.name", SHUFFLE_SERVICE_NAME) + .addSparkConf("spark.kubernetes.shuffle.service.daemonset.namespace", SHUFFLE_SERVICE_NAMESPACE) + .kubernetesAppNamespace(NAMESPACE) + .kubernetesMaster(s"https://${Minikube.getMinikubeIp}:8443") + .kubernetesCaCertFile(clientConfig.getCaCertFile) + .kubernetesClientCertFile(clientConfig.getClientCertFile) + .kubernetesClientKeyFile(clientConfig.getClientKeyFile) + .build() + new Client(args).run() + + val sparkMetricsService = Minikube.getService[SparkRestApiV1]( + "spark-pi-dyn", NAMESPACE, "spark-ui-port") + expectationsForDynamicAllocation(sparkMetricsService) + } + + test("Dynamic allocation pod management") { + val args = ClientArguments.builder() + .userMainClass(MAIN_CLASS) + .addJar(s"file://$EXAMPLES_JAR") + .addSparkConf("spark.master", "kubernetes") + .addSparkConf("spark.driver.memory", "512m") + .addSparkConf("spark.executor.memory", "512m") + .addSparkConf("spark.dynamicAllocation.enabled", "true") + .addSparkConf("spark.dynamicAllocation.minExecutors", "1") + .addSparkConf("spark.dynamicAllocation.initialExecutors", "2") + .addSparkConf("spark.dynamicAllocation.maxExecutors", "2") + .addSparkConf("spark.executor.cores", "1") + .addSparkConf("spark.shuffle.service.enabled", "true") + .addSparkConf("spark.dynamicAllocation.executorIdleTimeout", "10s") + .addSparkConf("spark.memory.storageFraction", "1.0") + .addSparkConf("spark.authenticate", "true") + .kubernetesAppName("spark-pi-dyn") + .driverDockerImage("spark-driver:latest") + .executorDockerImage("spark-executor:latest") + .addSparkConf("spark.kubernetes.shuffle.service.daemonset.name", SHUFFLE_SERVICE_NAME) + .addSparkConf("spark.kubernetes.shuffle.service.daemonset.namespace", SHUFFLE_SERVICE_NAMESPACE) + .kubernetesAppNamespace(NAMESPACE) + .kubernetesMaster(s"https://${Minikube.getMinikubeIp}:8443") + .kubernetesCaCertFile(clientConfig.getCaCertFile) + .kubernetesClientCertFile(clientConfig.getClientCertFile) + .kubernetesClientKeyFile(clientConfig.getClientKeyFile) + .build() + new Client(args).run() + def getExecutorPods: Iterable[Pod] = + minikubeKubernetesClient.pods() + .list() + .getItems + .asScala + .filter(pod => pod + .getSpec + .getContainers + .asScala + .count(container => container.getImage.equals("spark-executor:latest")) == 1) + Eventually.eventually(TIMEOUT, INTERVAL) { + val executorPods = getExecutorPods + assert(executorPods.size == 2) + executorPods.foreach(pod => assert(pod.getStatus.getPhase == "Running")) + } + + Eventually.eventually(TIMEOUT, INTERVAL) { + val executorPods = getExecutorPods + assert(executorPods.size == 1) + executorPods.foreach(pod => assert(pod.getStatus.getPhase == "Running")) + } + } + + private def expectationsForDynamicAllocation(sparkMetricsService: SparkRestApiV1): Unit = { + val apps = Eventually.eventually(TIMEOUT, INTERVAL) { + val result = sparkMetricsService + .getApplications(ImmutableList.of(ApplicationStatus.RUNNING, ApplicationStatus.COMPLETED)) + // Sometimes "appId" comes back as the app ID when the app is initializing. + assert(result.size == 1 + && !result.head.id.equalsIgnoreCase("appid") + && !result.head.id.equalsIgnoreCase("{appId}")) + result + } + val executors = Eventually.eventually(TIMEOUT, INTERVAL) { + val result = sparkMetricsService.getExecutors(apps.head.id) + assert(result.count(exec => exec.id != "driver") == 1) + assert(result.size == 2) + result + } + val completedApp = Eventually.eventually(TIMEOUT, INTERVAL) { + val result = sparkMetricsService.getStages( + apps.head.id, Seq(StageStatus.COMPLETE).asJava) + val allStages = sparkMetricsService.getStages( + apps.head.id, Seq[StageStatus]( + StageStatus.ACTIVE, + StageStatus.COMPLETE, + StageStatus.FAILED, + StageStatus.PENDING).asJava) + assert(result.size == allStages.size) + result + } + val noExecutorsAfterFinishedJob = Eventually.eventually(TIMEOUT, INTERVAL) { + val result = sparkMetricsService.getExecutors(apps.head.id) + assert(result.size == 1) + assert(result.count(exec => exec.id != "driver") == 0) + result + } + } + + test("Custom executor specification file") { + val executorSpecificationYml = + """ + |apiVersion: v1 + |kind: Pod + |metadata: + | name: executor-and-nginx + | labels: + | app: executor-and-nginx + |spec: + | containers: + | - name: nginx + | image: nginx:1.11.5-alpine + | ports: + | - containerPort: 80 + | - name: executor + | image: spark-executor:latest + | imagePullPolicy: Never + | + """.stripMargin + val writtenSpecFile = new File(Files.createTempDir(), "executor-replication-controller.yml") + FileUtils.write(writtenSpecFile, executorSpecificationYml, Charsets.UTF_8) + val args = ClientArguments.builder() + .userMainClass(MAIN_CLASS) + .addJar(s"file://$EXAMPLES_JAR") + .addSparkConf("spark.master", "kubernetes") + .addSparkConf("spark.driver.memory", "512m") + .addSparkConf("spark.executor.memory", "512m") + .addSparkConf("spark.dynamicAllocation.enabled", "true") + .addSparkConf("spark.dynamicAllocation.minExecutors", "0") + .addSparkConf("spark.dynamicAllocation.initialExecutors", "0") + .addSparkConf("spark.dynamicAllocation.maxExecutors", "1") + .addSparkConf("spark.executor.cores", "1") + .addSparkConf("spark.shuffle.service.enabled", "true") + .addSparkConf("spark.dynamicAllocation.executorIdleTimeout", "20s") + .addSparkConf("spark.memory.storageFraction", "1.0") + .addSparkConf("spark.authenticate", "true") + .kubernetesAppName("spark-pi-custom") + .driverDockerImage("spark-driver:latest") + .executorDockerImage("spark-executor:latest") + .addSparkConf("spark.kubernetes.shuffle.service.daemonset.name", SHUFFLE_SERVICE_NAME) + .addSparkConf("spark.kubernetes.shuffle.service.daemonset.namespace", SHUFFLE_SERVICE_NAMESPACE) + .kubernetesAppNamespace(NAMESPACE) + .kubernetesMaster(s"https://${Minikube.getMinikubeIp}:8443") + .kubernetesCaCertFile(clientConfig.getCaCertFile) + .kubernetesClientCertFile(clientConfig.getClientCertFile) + .kubernetesClientKeyFile(clientConfig.getClientKeyFile) + .customExecutorSpecFile(writtenSpecFile.getAbsolutePath) + .customExecutorSpecContainerName("executor") + .build() + new Client(args).run() + + Eventually.eventually(TIMEOUT, INTERVAL) { + val executorPods = minikubeKubernetesClient + .pods() + .list() + .getItems + .asScala + .filter(_.getStatus.getContainerStatuses.asScala.exists( + _.getImage == "spark-executor:latest")) + assert(executorPods.nonEmpty) + executorPods.foreach(pod => assert( + pod.getStatus.getContainerStatuses.asScala.exists(_.getImage == "nginx:1.11.5-alpine"))) + } + } + +} diff --git a/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/docker/SparkDockerImageBuilder.scala b/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/docker/SparkDockerImageBuilder.scala new file mode 100644 index 0000000000000..9dd2eb2d97d93 --- /dev/null +++ b/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/docker/SparkDockerImageBuilder.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes.integrationtest.docker + +import java.net.URI +import java.nio.file.Paths + +import com.spotify.docker.client.{DefaultDockerClient, DockerCertificates} +import org.apache.http.client.utils.URIBuilder +import org.scalatest.concurrent.{Eventually, PatienceConfiguration} +import org.scalatest.time.{Minutes, Seconds, Span} + +private[spark] class SparkDockerImageBuilder(private val dockerEnv: Map[String, String]) { + + private val TIMEOUT = PatienceConfiguration.Timeout(Span(2, Minutes)) + private val INTERVAL = PatienceConfiguration.Interval(Span(2, Seconds)) + private val dockerHost = dockerEnv.getOrElse("DOCKER_HOST", + throw new IllegalStateException("DOCKER_HOST env not found.")) + + private val originalDockerUri = URI.create(dockerHost) + private val httpsDockerUri = new URIBuilder() + .setHost(originalDockerUri.getHost) + .setPort(originalDockerUri.getPort) + .setScheme("https") + .build() + + private val dockerCerts = dockerEnv.getOrElse("DOCKER_CERT_PATH", + throw new IllegalStateException("DOCKER_CERT_PATH env not found.")) + + private val dockerClient = new DefaultDockerClient.Builder() + .uri(httpsDockerUri) + .dockerCertificates(DockerCertificates + .builder() + .dockerCertPath(Paths.get(dockerCerts)) + .build().get()) + .build() + + def buildSparkDockerImages(): Unit = { + Eventually.eventually(TIMEOUT, INTERVAL) { dockerClient.ping() } + dockerClient.build(Paths.get("target", "docker", "driver"), "spark-driver") + dockerClient.build(Paths.get("target", "docker", "executor"), "spark-executor") + dockerClient.build(Paths.get("target", "docker", "shuffle-service"), "spark-shuffle-service") + } + +} \ No newline at end of file diff --git a/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/minikube/Minikube.scala b/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/minikube/Minikube.scala new file mode 100644 index 0000000000000..30a7a46a8e514 --- /dev/null +++ b/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/minikube/Minikube.scala @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes.integrationtest.minikube + +import java.io.{BufferedReader, FileOutputStream, InputStreamReader} +import java.net.URL +import java.nio.channels.Channels +import java.nio.file.Paths +import java.util.concurrent.TimeUnit +import javax.net.ssl.X509TrustManager + +import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient} +import io.fabric8.kubernetes.client.internal.SSLUtils +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag + +import org.apache.spark.deploy.kubernetes.httpclients.HttpClientUtil +import org.apache.spark.internal.Logging +import org.apache.spark.util.Utils + +// TODO support windows +private[spark] object Minikube extends Logging { + private val MINIKUBE_EXECUTABLE_DEST = if (Utils.isMac) { + Paths.get("target", "minikube-bin", "darwin-amd64", "minikube").toFile + } else if (Utils.isWindows) { + throw new IllegalStateException("Executing Minikube based integration tests not yet " + + " available on Windows.") + } else { + Paths.get("target", "minikube-bin", "linux-amd64", "minikube").toFile + } + + private val EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE = "Minikube is not downloaded, expected at " + + s"${MINIKUBE_EXECUTABLE_DEST.getAbsolutePath}" + + private val MINIKUBE_STARTUP_TIMEOUT_SECONDS = 60 + + def startMinikube(): Unit = synchronized { + assert(MINIKUBE_EXECUTABLE_DEST.exists(), EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE) + if (getMinikubeStatus != MinikubeStatus.RUNNING) { + executeMinikube("start", "--memory", "6000", "--cpus", "8") + } else { + logInfo("Minikube is already started.") + } + } + + def getMinikubeIp: String = synchronized { + assert(MINIKUBE_EXECUTABLE_DEST.exists(), EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE) + val outputs = executeMinikube("ip") + assert(outputs.size == 1, "Unexpected amount of output from minikube ip") + outputs.head + } + + def getMinikubeStatus: MinikubeStatus.Value = synchronized { + assert(MINIKUBE_EXECUTABLE_DEST.exists(), EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE) + val statusString = executeMinikube("status").head.replaceFirst("minikubeVM: ", "") + MinikubeStatus.unapply(statusString) + .getOrElse(throw new IllegalStateException(s"Unknown status $statusString")) + } + + def getDockerEnv: Map[String, String] = synchronized { + assert(MINIKUBE_EXECUTABLE_DEST.exists(), EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE) + executeMinikube("docker-env") + .filter(_.startsWith("export")) + .map(_.replaceFirst("export ", "").split('=')) + .map(arr => (arr(0), arr(1).replaceAllLiterally("\"", ""))) + .toMap + } + + def deleteMinikube(): Unit = synchronized { + assert(MINIKUBE_EXECUTABLE_DEST.exists, EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE) + if (getMinikubeStatus != MinikubeStatus.DOES_NOT_EXIST) { + executeMinikube("delete") + } else { + logInfo("Minikube was already not running.") + } + } + + def getKubernetesClient: DefaultKubernetesClient = synchronized { + val kubernetesMaster = s"https://$getMinikubeIp:8443" + val userHome = System.getProperty("user.home") + val kubernetesConf = new ConfigBuilder() + .withApiVersion("v1") + .withMasterUrl(kubernetesMaster) + .withCaCertFile(Paths.get(userHome, ".minikube", "ca.crt").toFile.getAbsolutePath) + .withClientCertFile(Paths.get(userHome, ".minikube", "apiserver.crt").toFile.getAbsolutePath) + .withClientKeyFile(Paths.get(userHome, ".minikube", "apiserver.key").toFile.getAbsolutePath) + .build() + new DefaultKubernetesClient(kubernetesConf) + } + + def getService[T: ClassTag]( + serviceName: String, + namespace: String, + servicePortName: String, + servicePath: String = ""): T = synchronized { + val kubernetesMaster = s"https://$getMinikubeIp:8443" + val url = s"${ + Array[String]( + kubernetesMaster, + "api", "v1", "proxy", + "namespaces", namespace, + "services", serviceName).mkString("/")}" + + s":$servicePortName$servicePath" + val userHome = System.getProperty("user.home") + val kubernetesConf = new ConfigBuilder() + .withApiVersion("v1") + .withMasterUrl(kubernetesMaster) + .withCaCertFile(Paths.get(userHome, ".minikube", "ca.crt").toFile.getAbsolutePath) + .withClientCertFile(Paths.get(userHome, ".minikube", "apiserver.crt").toFile.getAbsolutePath) + .withClientKeyFile(Paths.get(userHome, ".minikube", "apiserver.key").toFile.getAbsolutePath) + .build() + val sslContext = SSLUtils.sslContext(kubernetesConf) + val trustManager = SSLUtils.trustManagers(kubernetesConf)(0).asInstanceOf[X509TrustManager] + HttpClientUtil.createClient[T](url, sslContext.getSocketFactory, trustManager) + } + + def executeMinikubeSsh(command: String): Unit = { + executeMinikube("ssh", command) + } + + private def executeMinikube(action: String, args: String*): Seq[String] = { + if (!MINIKUBE_EXECUTABLE_DEST.canExecute) { + if (!MINIKUBE_EXECUTABLE_DEST.setExecutable(true)) { + throw new IllegalStateException("Failed to make the Minikube binary executable.") + } + } + val fullCommand = Array(MINIKUBE_EXECUTABLE_DEST.getAbsolutePath, action) ++ args + val pb = new ProcessBuilder().command(fullCommand: _*) + pb.redirectErrorStream(true) + val proc = pb.start() + val outputLines = new ArrayBuffer[String] + + Utils.tryWithResource(new InputStreamReader(proc.getInputStream)) { procOutput => + Utils.tryWithResource(new BufferedReader(procOutput)) { (bufferedOutput: BufferedReader) => + var line: String = null + do { + line = bufferedOutput.readLine() + if (line != null) { + logInfo(line) + outputLines += line + } + } while (line != null) + } + } + assert(proc.waitFor(MINIKUBE_STARTUP_TIMEOUT_SECONDS, TimeUnit.SECONDS), + s"Timed out while executing $action on minikube.") + assert(proc.exitValue == 0, s"Failed to execute minikube $action ${args.mkString(" ")}") + outputLines.toSeq + } +} + +private[spark] object MinikubeStatus extends Enumeration { + + val RUNNING = status("Running") + val STOPPED = status("Stopped") + val DOES_NOT_EXIST = status("Does Not Exist") + val SAVED = status("Saved") + + def status(value: String): Value = new Val(nextId, value) + def unapply(s: String): Option[Value] = values.find(s == _.toString) +} diff --git a/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/restapis/SparkRestApiV1.scala b/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/restapis/SparkRestApiV1.scala new file mode 100644 index 0000000000000..7a3b06b1b5e58 --- /dev/null +++ b/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/restapis/SparkRestApiV1.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes.integrationtest.restapis + +import java.util.{List => JList} +import javax.ws.rs._ +import javax.ws.rs.core.MediaType + +import org.apache.spark.status.api.v1._ + +@Path("/api/v1") +@Consumes(Array(MediaType.APPLICATION_JSON)) +@Produces(Array(MediaType.APPLICATION_JSON)) +trait SparkRestApiV1 { + + @GET + @Path("/applications") + @Consumes(Array(MediaType.APPLICATION_JSON)) + @Produces(Array(MediaType.APPLICATION_JSON)) + def getApplications( + @QueryParam("status") applicationStatuses: JList[ApplicationStatus]): Seq[ApplicationInfo] + + @GET + @Path("applications/{appId}/stages") + @Consumes(Array(MediaType.APPLICATION_JSON)) + @Produces(Array(MediaType.APPLICATION_JSON)) + def getStages( + @PathParam("appId") appId: String, + @QueryParam("status") statuses: JList[StageStatus]): Seq[StageData] + + @GET + @Path("applications/{appId}/executors") + @Consumes(Array(MediaType.APPLICATION_JSON)) + @Produces(Array(MediaType.APPLICATION_JSON)) + def getExecutors(@PathParam("appId") appId: String): Seq[ExecutorSummary] +} diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java index 6767cc5079649..25d42409f12a5 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java @@ -76,6 +76,19 @@ class SparkSubmitOptionParser { protected final String PRINCIPAL = "--principal"; protected final String QUEUE = "--queue"; + // Kubernetes-only options. + protected final String KUBERNETES_CUSTOM_EXECUTOR_SPEC_FILE = "--custom-executor-spec"; + protected final String KUBERNETES_CUSTOM_EXECUTOR_SPEC_CONTAINER_NAME = "--custom-executor-spec-container-name"; + protected final String KUBERNETES_DRIVER_DOCKER_IMAGE = "--driver-docker-image"; + protected final String KUBERNETES_EXECUTOR_DOCKER_IMAGE = "--executor-docker-image"; + protected final String KUBERNETES_EXPOSE_DRIVER_PORT = "--expose-driver-port"; + protected final String KUBERNETES_APP_NAME = "--kubernetes-app-name"; + protected final String KUBERNETES_MASTER = "--kubernetes-master"; + protected final String KUBERNETES_APP_NAMESPACE = "--kubernetes-app-namespace"; + protected final String KUBERNETES_CLIENT_CERT_FILE = "--kubernetes-client-cert-file"; + protected final String KUBERNETES_CLIENT_KEY_FILE = "--kubernetes-client-key-file"; + protected final String KUBERNETES_CA_CERT_FILE = "--kubernetes-ca-cert-file"; + /** * This is the canonical list of spark-submit options. Each entry in the array contains the * different aliases for the same option; the first element of each entry is the "official" @@ -102,6 +115,17 @@ class SparkSubmitOptionParser { { JARS }, { KEYTAB }, { KILL_SUBMISSION }, + { KUBERNETES_APP_NAME }, + { KUBERNETES_APP_NAMESPACE }, + { KUBERNETES_CA_CERT_FILE }, + { KUBERNETES_CLIENT_CERT_FILE }, + { KUBERNETES_CLIENT_KEY_FILE }, + { KUBERNETES_CUSTOM_EXECUTOR_SPEC_FILE }, + { KUBERNETES_CUSTOM_EXECUTOR_SPEC_CONTAINER_NAME }, + { KUBERNETES_DRIVER_DOCKER_IMAGE }, + { KUBERNETES_EXECUTOR_DOCKER_IMAGE }, + { KUBERNETES_EXPOSE_DRIVER_PORT }, + { KUBERNETES_MASTER }, { MASTER }, { NAME }, { NUM_EXECUTORS }, diff --git a/pom.xml b/pom.xml index aaf7cfa7eb2ad..91b57e2a0d8d7 100644 --- a/pom.xml +++ b/pom.xml @@ -559,6 +559,7 @@ netty 3.8.0.Final + org.apache.derby derby @@ -617,6 +618,36 @@ jackson-module-jaxb-annotations ${fasterxml.jackson.version} + + com.fasterxml.jackson.jaxrs + jackson-jaxrs-json-provider + ${fasterxml.jackson.version} + + + com.netflix.feign + feign-core + 8.18.0 + + + com.netflix.feign + feign-okhttp + 8.18.0 + + + com.netflix.feign + feign-jackson + 8.18.0 + + + com.netflix.feign + feign-jaxrs + 8.18.0 + + + com.squareup.okhttp3 + okhttp + 3.4.1 + org.glassfish.jersey.core jersey-server @@ -632,6 +663,11 @@ jersey-client ${jersey.version} + + org.glassfish.jersey.connectors + jersey-apache-connector + 2.22.2 + org.glassfish.jersey.containers jersey-container-servlet @@ -757,6 +793,22 @@ + + com.spotify + docker-client + 3.6.6 + test + + + guava + com.google.guava + + + commons-logging + commons-logging + + + mysql mysql-connector-java @@ -2546,6 +2598,23 @@ + + kubernetes + + kubernetes/core + + + + + kubernetes-integration-tests + + kubernetes/docker-minimal-bundle + kubernetes/integration-tests + kubernetes/integration-tests-spark-jobs + + + + hive-thriftserver diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 55e4a833b6707..42ffdcb0181a2 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -92,7 +92,7 @@ private[spark] class Client( // Executor related configurations private val executorMemory = sparkConf.get(EXECUTOR_MEMORY) - private val executorMemoryOverhead = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse( + private val executorMemoryOverhead = sparkConf.get(YARN_EXECUTOR_MEMORY_OVERHEAD).getOrElse( math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toLong, MEMORY_OVERHEAD_MIN)).toInt private val distCacheMgr = new ClientDistributedCacheManager() diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 0b66d1cf08eac..1b26727006328 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -133,7 +133,7 @@ private[yarn] class YarnAllocator( // Executor memory in MB. protected val executorMemory = sparkConf.get(EXECUTOR_MEMORY).toInt // Additional memory overhead. - protected val memoryOverhead: Int = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse( + protected val memoryOverhead: Int = sparkConf.get(YARN_EXECUTOR_MEMORY_OVERHEAD).getOrElse( math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN)).toInt // Number of cores per executor. protected val executorCores = sparkConf.get(EXECUTOR_CORES) diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index 2f9ea1911fd61..90882ab7cb1b1 100644 --- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -112,7 +112,8 @@ private[spark] abstract class YarnSchedulerBackend( * Get an application ID associated with the job. * This returns the string value of [[appId]] if set, otherwise * the locally-generated ID from the superclass. - * @return The application ID + * + * @return The application ID */ override def applicationId(): String = { appId.map(_.toString).getOrElse { @@ -308,6 +309,15 @@ private[spark] abstract class YarnSchedulerBackend( } } } + + // The num of current max ExecutorId used to re-register appMaster + @volatile protected var currentExecutorIdCounter = 0 + + override def registerExecutorId(executorId: String): Unit = { + if (currentExecutorIdCounter < executorId.toInt) { + currentExecutorIdCounter = executorId.toInt + } + } } private[spark] object YarnSchedulerBackend {