diff --git a/bin/docker-image-tool.sh b/bin/docker-image-tool.sh index f090240065bf1..a871ab5d448c3 100755 --- a/bin/docker-image-tool.sh +++ b/bin/docker-image-tool.sh @@ -63,12 +63,20 @@ function build { if [ ! -d "$IMG_PATH" ]; then error "Cannot find docker image. This script must be run from a runnable distribution of Apache Spark." fi - - local DOCKERFILE=${DOCKERFILE:-"$IMG_PATH/spark/Dockerfile"} + local BINDING_BUILD_ARGS=( + --build-arg + base_img=$(image_ref spark) + ) + local BASEDOCKERFILE=${BASEDOCKERFILE:-"$IMG_PATH/spark/Dockerfile"} + local PYDOCKERFILE=${PYDOCKERFILE:-"$IMG_PATH/spark/bindings/python/Dockerfile"} docker build "${BUILD_ARGS[@]}" \ -t $(image_ref spark) \ - -f "$DOCKERFILE" . + -f "$BASEDOCKERFILE" . + + docker build "${BINDING_BUILD_ARGS[@]}" \ + -t $(image_ref spark-py) \ + -f "$PYDOCKERFILE" . } function push { @@ -86,7 +94,8 @@ Commands: push Push a pre-built image to a registry. Requires a repository address to be provided. Options: - -f file Dockerfile to build. By default builds the Dockerfile shipped with Spark. + -f file Dockerfile to build for JVM based Jobs. By default builds the Dockerfile shipped with Spark. + -p file Dockerfile with Python baked in. By default builds the Dockerfile shipped with Spark. -r repo Repository address. -t tag Tag to apply to the built image, or to identify the image to be pushed. -m Use minikube's Docker daemon. @@ -116,12 +125,14 @@ fi REPO= TAG= -DOCKERFILE= +BASEDOCKERFILE= +PYDOCKERFILE= while getopts f:mr:t: option do case "${option}" in - f) DOCKERFILE=${OPTARG};; + f) BASEDOCKERFILE=${OPTARG};; + p) PYDOCKERFILE=${OPTARG};; r) REPO=${OPTARG};; t) TAG=${OPTARG};; m) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 427c797755b84..793c37143391b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -284,8 +284,6 @@ private[spark] class SparkSubmit extends Logging { case (STANDALONE, CLUSTER) if args.isR => error("Cluster deploy mode is currently not supported for R " + "applications on standalone clusters.") - case (KUBERNETES, _) if args.isPython => - error("Python applications are currently not supported for Kubernetes.") case (KUBERNETES, _) if args.isR => error("R applications are currently not supported for Kubernetes.") case (LOCAL, CLUSTER) => @@ -695,9 +693,17 @@ private[spark] class SparkSubmit extends Logging { if (isKubernetesCluster) { childMainClass = KUBERNETES_CLUSTER_SUBMIT_CLASS if (args.primaryResource != SparkLauncher.NO_RESOURCE) { - childArgs ++= Array("--primary-java-resource", args.primaryResource) + if (args.isPython) { + childArgs ++= Array("--primary-py-file", args.primaryResource) + childArgs ++= Array("--main-class", "org.apache.spark.deploy.PythonRunner") + if (args.pyFiles != null) { + childArgs ++= Array("--other-py-files", args.pyFiles) + } + } else { + childArgs ++= Array("--primary-java-resource", args.primaryResource) + childArgs ++= Array("--main-class", args.mainClass) + } } - childArgs ++= Array("--main-class", args.mainClass) if (args.childArgs != null) { args.childArgs.foreach { arg => childArgs += ("--arg", arg) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 4086970ffb256..f9a4205b3a6e9 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -117,6 +117,28 @@ private[spark] object Config extends Logging { .stringConf .createWithDefault("spark") + val KUBERNETES_PYSPARK_PY_FILES = + ConfigBuilder("spark.kubernetes.python.pyFiles") + .doc("The PyFiles that are distributed via client arguments") + .internal() + .stringConf + .createOptional + + val KUBERNETES_PYSPARK_MAIN_APP_RESOURCE = + ConfigBuilder("spark.kubernetes.python.mainAppResource") + .doc("The main app resource for pyspark jobs") + .internal() + .stringConf + .createOptional + + val KUBERNETES_PYSPARK_APP_ARGS = + ConfigBuilder("spark.kubernetes.python.appArgs") + .doc("The app arguments for PySpark Jobs") + .internal() + .stringConf + .createOptional + + val KUBERNETES_ALLOCATION_BATCH_SIZE = ConfigBuilder("spark.kubernetes.allocation.batch.size") .doc("Number of pods to launch at once in each round of executor allocation.") @@ -154,6 +176,13 @@ private[spark] object Config extends Logging { .checkValue(interval => interval > 0, s"Logging interval must be a positive time value.") .createWithDefaultString("1s") + val MEMORY_OVERHEAD_FACTOR = + ConfigBuilder("spark.kubernetes.memoryOverheadFactor") + .doc("This sets the Memory Overhead Factor that will allocate memory to non-JVM jobs " + + "which in the case of JVM tasks will default to 0.10 and 0.40 for non-JVM jobs") + .doubleConf + .createWithDefault(0.10) + val KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX = "spark.kubernetes.authenticate.submission" diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala index 8da5f24044aad..3884755e3aca7 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala @@ -71,9 +71,13 @@ private[spark] object Constants { val SPARK_CONF_FILE_NAME = "spark.properties" val SPARK_CONF_PATH = s"$SPARK_CONF_DIR_INTERNAL/$SPARK_CONF_FILE_NAME" + // BINDINGS + val ENV_PYSPARK_PRIMARY = "PYSPARK_PRIMARY" + val ENV_PYSPARK_FILES = "PYSPARK_FILES" + val ENV_PYSPARK_ARGS = "PYSPARK_APP_ARGS" + // Miscellaneous val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc" val DRIVER_CONTAINER_NAME = "spark-kubernetes-driver" - val MEMORY_OVERHEAD_FACTOR = 0.10 val MEMORY_OVERHEAD_MIN_MIB = 384L } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala index 77b634ddfabcc..0e28de97b0567 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala @@ -16,14 +16,17 @@ */ package org.apache.spark.deploy.k8s +import scala.collection.mutable + import io.fabric8.kubernetes.api.model.{LocalObjectReference, LocalObjectReferenceBuilder, Pod} import org.apache.spark.SparkConf import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ -import org.apache.spark.deploy.k8s.submit.{JavaMainAppResource, MainAppResource} +import org.apache.spark.deploy.k8s.submit._ import org.apache.spark.internal.config.ConfigEntry + private[spark] sealed trait KubernetesRoleSpecificConf /* @@ -54,7 +57,8 @@ private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf]( roleLabels: Map[String, String], roleAnnotations: Map[String, String], roleSecretNamesToMountPaths: Map[String, String], - roleEnvs: Map[String, String]) { + roleEnvs: Map[String, String], + sparkFiles: Seq[String]) { def namespace(): String = sparkConf.get(KUBERNETES_NAMESPACE) @@ -63,10 +67,14 @@ private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf]( .map(str => str.split(",").toSeq) .getOrElse(Seq.empty[String]) - def sparkFiles(): Seq[String] = sparkConf - .getOption("spark.files") - .map(str => str.split(",").toSeq) - .getOrElse(Seq.empty[String]) + def pyFiles(): Option[String] = sparkConf + .get(KUBERNETES_PYSPARK_PY_FILES) + + def pySparkMainResource(): Option[String] = sparkConf + .get(KUBERNETES_PYSPARK_MAIN_APP_RESOURCE) + + def pySparkAppArgs(): Option[String] = sparkConf + .get(KUBERNETES_PYSPARK_APP_ARGS) def imagePullPolicy(): String = sparkConf.get(CONTAINER_IMAGE_PULL_POLICY) @@ -101,17 +109,29 @@ private[spark] object KubernetesConf { appId: String, mainAppResource: Option[MainAppResource], mainClass: String, - appArgs: Array[String]): KubernetesConf[KubernetesDriverSpecificConf] = { + appArgs: Array[String], + maybePyFiles: Option[String]): KubernetesConf[KubernetesDriverSpecificConf] = { val sparkConfWithMainAppJar = sparkConf.clone() + val additionalFiles = mutable.ArrayBuffer.empty[String] mainAppResource.foreach { - case JavaMainAppResource(res) => - val previousJars = sparkConf - .getOption("spark.jars") - .map(_.split(",")) - .getOrElse(Array.empty) - if (!previousJars.contains(res)) { - sparkConfWithMainAppJar.setJars(previousJars ++ Seq(res)) - } + case JavaMainAppResource(res) => + val previousJars = sparkConf + .getOption("spark.jars") + .map(_.split(",")) + .getOrElse(Array.empty) + if (!previousJars.contains(res)) { + sparkConfWithMainAppJar.setJars(previousJars ++ Seq(res)) + } + case nonJVM: NonJVMResource => + nonJVM match { + case PythonMainAppResource(res) => + additionalFiles += res + maybePyFiles.foreach{maybePyFiles => + additionalFiles.appendAll(maybePyFiles.split(","))} + sparkConfWithMainAppJar.set(KUBERNETES_PYSPARK_MAIN_APP_RESOURCE, res) + sparkConfWithMainAppJar.set(KUBERNETES_PYSPARK_APP_ARGS, appArgs.mkString(" ")) + } + sparkConfWithMainAppJar.set(MEMORY_OVERHEAD_FACTOR, 0.4) } val driverCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs( @@ -132,6 +152,11 @@ private[spark] object KubernetesConf { val driverEnvs = KubernetesUtils.parsePrefixedKeyValuePairs( sparkConf, KUBERNETES_DRIVER_ENV_PREFIX) + val sparkFiles = sparkConf + .getOption("spark.files") + .map(str => str.split(",").toSeq) + .getOrElse(Seq.empty[String]) ++ additionalFiles + KubernetesConf( sparkConfWithMainAppJar, KubernetesDriverSpecificConf(mainAppResource, mainClass, appName, appArgs), @@ -140,7 +165,8 @@ private[spark] object KubernetesConf { driverLabels, driverAnnotations, driverSecretNamesToMountPaths, - driverEnvs) + driverEnvs, + sparkFiles) } def createExecutorConf( @@ -179,6 +205,7 @@ private[spark] object KubernetesConf { executorLabels, executorAnnotations, executorSecrets, - executorEnv) + executorEnv, + Seq.empty[String]) } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala index ee629068ad90d..593fb531a004d 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala @@ -52,7 +52,7 @@ private[spark] object KubernetesUtils { } } - private def resolveFileUri(uri: String): String = { + def resolveFileUri(uri: String): String = { val fileUri = Utils.resolveURI(uri) val fileScheme = Option(fileUri.getScheme).getOrElse("file") fileScheme match { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala index 07bdccbe0479d..0e2f279c189c9 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala @@ -25,6 +25,7 @@ import org.apache.spark.SparkException import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, KubernetesUtils, SparkPod} import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit._ import org.apache.spark.internal.config._ import org.apache.spark.launcher.SparkLauncher @@ -44,11 +45,16 @@ private[spark] class BasicDriverFeatureStep( private val driverCpuCores = conf.get("spark.driver.cores", "1") private val driverLimitCores = conf.get(KUBERNETES_DRIVER_LIMIT_CORES) + private val driverDockerContainer = conf.roleSpecificConf.mainAppResource.map { + case JavaMainAppResource(_) => "driver" + case PythonMainAppResource(_) => "driver-py" + }.getOrElse(throw new SparkException("Must specify a JVM or Python Resource")) // Memory settings private val driverMemoryMiB = conf.get(DRIVER_MEMORY) private val memoryOverheadMiB = conf .get(DRIVER_MEMORY_OVERHEAD) - .getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * driverMemoryMiB).toInt, MEMORY_OVERHEAD_MIN_MIB)) + .getOrElse(math.max((conf.get(MEMORY_OVERHEAD_FACTOR) * driverMemoryMiB).toInt, + MEMORY_OVERHEAD_MIN_MIB)) private val driverMemoryWithOverheadMiB = driverMemoryMiB + memoryOverheadMiB override def configurePod(pod: SparkPod): SparkPod = { @@ -71,7 +77,7 @@ private[spark] class BasicDriverFeatureStep( ("cpu", new QuantityBuilder(false).withAmount(limitCores).build()) } - val driverContainer = new ContainerBuilder(pod.container) + val withoutArgsDriverContainer: ContainerBuilder = new ContainerBuilder(pod.container) .withName(DRIVER_CONTAINER_NAME) .withImage(driverContainerImage) .withImagePullPolicy(conf.imagePullPolicy()) @@ -88,15 +94,22 @@ private[spark] class BasicDriverFeatureStep( .addToRequests("memory", driverMemoryQuantity) .addToLimits("memory", driverMemoryQuantity) .endResources() - .addToArgs("driver") + .addToArgs(driverDockerContainer) .addToArgs("--properties-file", SPARK_CONF_PATH) .addToArgs("--class", conf.roleSpecificConf.mainClass) - // The user application jar is merged into the spark.jars list and managed through that - // property, so there is no need to reference it explicitly here. - .addToArgs(SparkLauncher.NO_RESOURCE) - .addToArgs(conf.roleSpecificConf.appArgs: _*) - .build() + val driverContainer = + if (driverDockerContainer == "driver-py") { + withoutArgsDriverContainer + .build() + } else { + // The user application jar is merged into the spark.jars list and managed through that + // property, so there is no need to reference it explicitly here. + withoutArgsDriverContainer + .addToArgs(SparkLauncher.NO_RESOURCE) + .addToArgs(conf.roleSpecificConf.appArgs: _*) + .build() + } val driverPod = new PodBuilder(pod.pod) .editOrNewMetadata() .withName(driverPodName) @@ -122,7 +135,7 @@ private[spark] class BasicDriverFeatureStep( val resolvedSparkJars = KubernetesUtils.resolveFileUrisAndPath( conf.sparkJars()) val resolvedSparkFiles = KubernetesUtils.resolveFileUrisAndPath( - conf.sparkFiles()) + conf.sparkFiles) if (resolvedSparkJars.nonEmpty) { additionalProps.put("spark.jars", resolvedSparkJars.mkString(",")) } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala index d22097587aafe..79d467eadd30f 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala @@ -54,7 +54,7 @@ private[spark] class BasicExecutorFeatureStep( private val memoryOverheadMiB = kubernetesConf .get(EXECUTOR_MEMORY_OVERHEAD) - .getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMiB).toInt, + .getOrElse(math.max((kubernetesConf.get(MEMORY_OVERHEAD_FACTOR) * executorMemoryMiB).toInt, MEMORY_OVERHEAD_MIN_MIB)) private val executorMemoryWithOverhead = executorMemoryMiB + memoryOverheadMiB diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStep.scala new file mode 100644 index 0000000000000..295d7093c5952 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStep.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.features.bindings + +import io.fabric8.kubernetes.api.model.ContainerBuilder +import io.fabric8.kubernetes.api.model.HasMetadata + +import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesRoleSpecificConf, SparkPod} +import org.apache.spark.deploy.k8s.Constants.{ENV_PYSPARK_ARGS, ENV_PYSPARK_FILES, ENV_PYSPARK_PRIMARY} +import org.apache.spark.deploy.k8s.KubernetesUtils +import org.apache.spark.deploy.k8s.features.KubernetesFeatureConfigStep + +private[spark] class PythonDriverFeatureStep( + kubernetesConf: KubernetesConf[_ <: KubernetesRoleSpecificConf]) + extends KubernetesFeatureConfigStep { + override def configurePod(pod: SparkPod): SparkPod = { + val mainResource = kubernetesConf.pySparkMainResource() + require(mainResource.isDefined, "PySpark Main Resource must be defined") + val otherPyFiles = kubernetesConf.pyFiles().map(pyFile => + KubernetesUtils.resolveFileUrisAndPath(pyFile.split(",")) + .mkString(",")).getOrElse("") + val withPythonPrimaryFileContainer = new ContainerBuilder(pod.container) + .addNewEnv() + .withName(ENV_PYSPARK_ARGS) + .withValue(kubernetesConf.pySparkAppArgs().getOrElse("")) + .endEnv() + .addNewEnv() + .withName(ENV_PYSPARK_PRIMARY) + .withValue(KubernetesUtils.resolveFileUri(mainResource.get)) + .endEnv() + .addNewEnv() + .withName(ENV_PYSPARK_FILES) + .withValue(if (otherPyFiles == "") {""} else otherPyFiles) + .endEnv() + .build() + SparkPod(pod.pod, withPythonPrimaryFileContainer) + } + override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty + + override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala index a97f5650fb869..30475108fd2c0 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala @@ -39,11 +39,13 @@ import org.apache.spark.util.Utils * @param mainAppResource the main application resource if any * @param mainClass the main class of the application to run * @param driverArgs arguments to the driver + * @param maybePyFiles additional Python files via --py-files */ private[spark] case class ClientArguments( mainAppResource: Option[MainAppResource], mainClass: String, - driverArgs: Array[String]) + driverArgs: Array[String], + maybePyFiles: Option[String]) private[spark] object ClientArguments { @@ -51,10 +53,15 @@ private[spark] object ClientArguments { var mainAppResource: Option[MainAppResource] = None var mainClass: Option[String] = None val driverArgs = mutable.ArrayBuffer.empty[String] + var maybePyFiles : Option[String] = None args.sliding(2, 2).toList.foreach { case Array("--primary-java-resource", primaryJavaResource: String) => mainAppResource = Some(JavaMainAppResource(primaryJavaResource)) + case Array("--primary-py-file", primaryPythonResource: String) => + mainAppResource = Some(PythonMainAppResource(primaryPythonResource)) + case Array("--other-py-files", pyFiles: String) => + maybePyFiles = Some(pyFiles) case Array("--main-class", clazz: String) => mainClass = Some(clazz) case Array("--arg", arg: String) => @@ -69,7 +76,8 @@ private[spark] object ClientArguments { ClientArguments( mainAppResource, mainClass.get, - driverArgs.toArray) + driverArgs.toArray, + maybePyFiles) } } @@ -206,6 +214,7 @@ private[spark] class KubernetesClientApplication extends SparkApplication { val kubernetesResourceNamePrefix = { s"$appName-$launchTime".toLowerCase.replaceAll("\\.", "-") } + sparkConf.set("spark.kubernetes.python.pyFiles", clientArguments.maybePyFiles.getOrElse("")) val kubernetesConf = KubernetesConf.createDriverConf( sparkConf, appName, @@ -213,7 +222,8 @@ private[spark] class KubernetesClientApplication extends SparkApplication { kubernetesAppId, clientArguments.mainAppResource, clientArguments.mainClass, - clientArguments.driverArgs) + clientArguments.driverArgs, + clientArguments.maybePyFiles) val builder = new KubernetesDriverBuilder val namespace = kubernetesConf.namespace() // The master URL has been checked for validity already in SparkSubmit. diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala index c7579ed8cb689..6d7b644a2ee77 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala @@ -18,6 +18,8 @@ package org.apache.spark.deploy.k8s.submit import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpec, KubernetesDriverSpecificConf, KubernetesRoleSpecificConf} import org.apache.spark.deploy.k8s.features.{BasicDriverFeatureStep, DriverKubernetesCredentialsFeatureStep, DriverServiceFeatureStep, MountSecretsFeatureStep} +import org.apache.spark.deploy.k8s.features.KubernetesFeatureConfigStep +import org.apache.spark.deploy.k8s.features.bindings.PythonDriverFeatureStep private[spark] class KubernetesDriverBuilder( provideBasicStep: (KubernetesConf[KubernetesDriverSpecificConf]) => BasicDriverFeatureStep = @@ -29,7 +31,11 @@ private[spark] class KubernetesDriverBuilder( new DriverServiceFeatureStep(_), provideSecretsStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf] => MountSecretsFeatureStep) = - new MountSecretsFeatureStep(_)) { + new MountSecretsFeatureStep(_), + providePythonStep: ( + KubernetesConf[_ <: KubernetesRoleSpecificConf] + => PythonDriverFeatureStep) = + new PythonDriverFeatureStep(_)) { def buildFromFeatures( kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf]): KubernetesDriverSpec = { @@ -37,10 +43,18 @@ private[spark] class KubernetesDriverBuilder( provideBasicStep(kubernetesConf), provideCredentialsStep(kubernetesConf), provideServiceStep(kubernetesConf)) - val allFeatures = if (kubernetesConf.roleSecretNamesToMountPaths.nonEmpty) { - baseFeatures ++ Seq(provideSecretsStep(kubernetesConf)) - } else baseFeatures - + val maybeRoleSecretNamesStep = if (kubernetesConf.roleSecretNamesToMountPaths.nonEmpty) { + Some(provideSecretsStep(kubernetesConf)) } else None + val maybeNonJVMBindings = kubernetesConf.roleSpecificConf.mainAppResource.getOrElse(None) + match { + case PythonMainAppResource(_) => + Some(providePythonStep(kubernetesConf)) + case _ => None + } + val allFeatures: Seq[KubernetesFeatureConfigStep] = + baseFeatures ++ + maybeRoleSecretNamesStep.toSeq ++ + maybeNonJVMBindings.toSeq var spec = KubernetesDriverSpec.initialSpec(kubernetesConf.sparkConf.getAll.toMap) for (feature <- allFeatures) { val configuredPod = feature.configurePod(spec.pod) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/MainAppResource.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/MainAppResource.scala index cca9f4627a1f6..cbe081ae35683 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/MainAppResource.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/MainAppResource.scala @@ -18,4 +18,9 @@ package org.apache.spark.deploy.k8s.submit private[spark] sealed trait MainAppResource +private[spark] sealed trait NonJVMResource + private[spark] case class JavaMainAppResource(primaryResource: String) extends MainAppResource + +private[spark] case class PythonMainAppResource(primaryResource: String) + extends MainAppResource with NonJVMResource diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala index 22568fe7ea3be..837fb503f63ba 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala @@ -18,6 +18,7 @@ package org.apache.spark.scheduler.cluster.k8s import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesExecutorSpecificConf, KubernetesRoleSpecificConf, SparkPod} import org.apache.spark.deploy.k8s.features.{BasicExecutorFeatureStep, MountSecretsFeatureStep} +import org.apache.spark.deploy.k8s.features.KubernetesFeatureConfigStep private[spark] class KubernetesExecutorBuilder( provideBasicStep: (KubernetesConf[KubernetesExecutorSpecificConf]) => BasicExecutorFeatureStep = @@ -29,9 +30,11 @@ private[spark] class KubernetesExecutorBuilder( def buildFromFeatures( kubernetesConf: KubernetesConf[KubernetesExecutorSpecificConf]): SparkPod = { val baseFeatures = Seq(provideBasicStep(kubernetesConf)) - val allFeatures = if (kubernetesConf.roleSecretNamesToMountPaths.nonEmpty) { - baseFeatures ++ Seq(provideSecretsStep(kubernetesConf)) - } else baseFeatures + val maybeRoleSecretNamesStep = if (kubernetesConf.roleSecretNamesToMountPaths.nonEmpty) { + Some(provideSecretsStep(kubernetesConf)) } else None + val allFeatures: Seq[KubernetesFeatureConfigStep] = + baseFeatures ++ + maybeRoleSecretNamesStep.toSeq var executorPod = SparkPod.initialPod() for (feature <- allFeatures) { executorPod = feature.configurePod(executorPod) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala index f10202f7a3546..a4587cbe37f32 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala @@ -22,7 +22,7 @@ import io.fabric8.kubernetes.api.model.{LocalObjectReferenceBuilder, PodBuilder} import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ -import org.apache.spark.deploy.k8s.submit.JavaMainAppResource +import org.apache.spark.deploy.k8s.submit._ class KubernetesConfSuite extends SparkFunSuite { @@ -55,7 +55,8 @@ class KubernetesConfSuite extends SparkFunSuite { APP_ID, None, MAIN_CLASS, - APP_ARGS) + APP_ARGS, + None) assert(conf.appId === APP_ID) assert(conf.sparkConf.getAll.toMap === sparkConf.getAll.toMap) assert(conf.appResourceNamePrefix === RESOURCE_NAME_PREFIX) @@ -76,7 +77,8 @@ class KubernetesConfSuite extends SparkFunSuite { APP_ID, mainAppJar, MAIN_CLASS, - APP_ARGS) + APP_ARGS, + None) assert(kubernetesConfWithMainJar.sparkConf.get("spark.jars") .split(",") === Array("local:///opt/spark/jar1.jar", "local:///opt/spark/main.jar")) @@ -87,11 +89,37 @@ class KubernetesConfSuite extends SparkFunSuite { APP_ID, None, MAIN_CLASS, - APP_ARGS) + APP_ARGS, + None) assert(kubernetesConfWithoutMainJar.sparkConf.get("spark.jars").split(",") === Array("local:///opt/spark/jar1.jar")) + assert(kubernetesConfWithoutMainJar.sparkConf.get(MEMORY_OVERHEAD_FACTOR) === 0.1) } + test("Creating driver conf with a python primary file") { + val mainResourceFile = "local:///opt/spark/main.py" + val inputPyFiles = Array("local:///opt/spark/example2.py", "local:///example3.py") + val sparkConf = new SparkConf(false) + .setJars(Seq("local:///opt/spark/jar1.jar")) + .set("spark.files", "local:///opt/spark/example4.py") + val mainAppResource = Some(PythonMainAppResource(mainResourceFile)) + val kubernetesConfWithMainResource = KubernetesConf.createDriverConf( + sparkConf, + APP_NAME, + RESOURCE_NAME_PREFIX, + APP_ID, + mainAppResource, + MAIN_CLASS, + APP_ARGS, + Some(inputPyFiles.mkString(","))) + assert(kubernetesConfWithMainResource.sparkConf.get("spark.jars").split(",") + === Array("local:///opt/spark/jar1.jar")) + assert(kubernetesConfWithMainResource.sparkConf.get(MEMORY_OVERHEAD_FACTOR) === 0.4) + assert(kubernetesConfWithMainResource.sparkFiles + === Array("local:///opt/spark/example4.py", mainResourceFile) ++ inputPyFiles) + } + + test("Resolve driver labels, annotations, secret mount paths, and envs.") { val sparkConf = new SparkConf(false) CUSTOM_LABELS.foreach { case (key, value) => @@ -114,7 +142,8 @@ class KubernetesConfSuite extends SparkFunSuite { APP_ID, None, MAIN_CLASS, - APP_ARGS) + APP_ARGS, + None) assert(conf.roleLabels === Map( SPARK_APP_ID_LABEL -> APP_ID, SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE) ++ diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala index eee85b8baa730..23fb27cc666b2 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala @@ -24,6 +24,8 @@ import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, SparkPod} import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit.JavaMainAppResource +import org.apache.spark.deploy.k8s.submit.PythonMainAppResource class BasicDriverFeatureStepSuite extends SparkFunSuite { @@ -33,6 +35,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { private val CONTAINER_IMAGE_PULL_POLICY = "IfNotPresent" private val APP_NAME = "spark-test" private val MAIN_CLASS = "org.apache.spark.examples.SparkPi" + private val PY_MAIN_CLASS = "org.apache.spark.deploy.PythonRunner" private val APP_ARGS = Array("arg1", "arg2", "\"arg 3\"") private val CUSTOM_ANNOTATION_KEY = "customAnnotation" private val CUSTOM_ANNOTATION_VALUE = "customAnnotationValue" @@ -60,7 +63,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { val kubernetesConf = KubernetesConf( sparkConf, KubernetesDriverSpecificConf( - None, + Some(JavaMainAppResource("")), APP_NAME, MAIN_CLASS, APP_ARGS), @@ -69,7 +72,8 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { DRIVER_LABELS, DRIVER_ANNOTATIONS, Map.empty, - DRIVER_ENVS) + DRIVER_ENVS, + Seq.empty[String]) val featureStep = new BasicDriverFeatureStep(kubernetesConf) val basePod = SparkPod.initialPod() @@ -109,7 +113,6 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { assert(driverPodMetadata.getLabels.asScala === DRIVER_LABELS) assert(driverPodMetadata.getAnnotations.asScala === DRIVER_ANNOTATIONS) assert(configuredPod.pod.getSpec.getRestartPolicy === "Never") - val expectedSparkConf = Map( KUBERNETES_DRIVER_POD_NAME.key -> "spark-driver-pod", "spark.app.id" -> APP_ID, @@ -118,6 +121,47 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { assert(featureStep.getAdditionalPodSystemProperties() === expectedSparkConf) } + test("Check appropriate entrypoint rerouting for various bindings") { + val sparkConf = new SparkConf() + .set(org.apache.spark.internal.config.DRIVER_MEMORY.key, "4g") + .set(CONTAINER_IMAGE, "spark-driver:latest") + val javaKubernetesConf = KubernetesConf( + sparkConf, + KubernetesDriverSpecificConf( + Some(JavaMainAppResource("")), + APP_NAME, + PY_MAIN_CLASS, + APP_ARGS), + RESOURCE_NAME_PREFIX, + APP_ID, + DRIVER_LABELS, + DRIVER_ANNOTATIONS, + Map.empty, + DRIVER_ENVS, + Seq.empty[String]) + val pythonKubernetesConf = KubernetesConf( + sparkConf, + KubernetesDriverSpecificConf( + Some(PythonMainAppResource("")), + APP_NAME, + PY_MAIN_CLASS, + APP_ARGS), + RESOURCE_NAME_PREFIX, + APP_ID, + DRIVER_LABELS, + DRIVER_ANNOTATIONS, + Map.empty, + DRIVER_ENVS, + Seq.empty[String]) + val javaFeatureStep = new BasicDriverFeatureStep(javaKubernetesConf) + val pythonFeatureStep = new BasicDriverFeatureStep(pythonKubernetesConf) + val basePod = SparkPod.initialPod() + val configuredJavaPod = javaFeatureStep.configurePod(basePod) + val configuredPythonPod = pythonFeatureStep.configurePod(basePod) + assert(configuredJavaPod.container.getArgs.get(0) === "driver") + assert(configuredPythonPod.container.getArgs.get(0) === "driver-py") + } + test("Additional system properties resolve jars and set cluster-mode confs.") { val allJars = Seq("local:///opt/spark/jar1.jar", "hdfs:///opt/spark/jar2.jar") val allFiles = Seq("https://localhost:9000/file1.txt", "local:///opt/spark/file2.txt") @@ -129,7 +173,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { val kubernetesConf = KubernetesConf( sparkConf, KubernetesDriverSpecificConf( - None, + Some(JavaMainAppResource("")), APP_NAME, MAIN_CLASS, APP_ARGS), @@ -138,7 +182,8 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { DRIVER_LABELS, DRIVER_ANNOTATIONS, Map.empty, - Map.empty) + Map.empty, + allFiles) val step = new BasicDriverFeatureStep(kubernetesConf) val additionalProperties = step.getAdditionalPodSystemProperties() val expectedSparkConf = Map( diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala index a764f7630b5c8..e1a6b7a105b2b 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala @@ -87,7 +87,8 @@ class BasicExecutorFeatureStepSuite LABELS, ANNOTATIONS, Map.empty, - Map.empty)) + Map.empty, + Seq.empty[String])) val executor = step.configurePod(SparkPod.initialPod()) // The executor pod name and default labels. @@ -124,7 +125,8 @@ class BasicExecutorFeatureStepSuite LABELS, ANNOTATIONS, Map.empty, - Map.empty)) + Map.empty, + Seq.empty[String])) assert(step.configurePod(SparkPod.initialPod()).pod.getSpec.getHostname.length === 63) } @@ -142,7 +144,8 @@ class BasicExecutorFeatureStepSuite LABELS, ANNOTATIONS, Map.empty, - Map("qux" -> "quux"))) + Map("qux" -> "quux"), + Seq.empty[String])) val executor = step.configurePod(SparkPod.initialPod()) checkEnv(executor, diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala index 9f817d3bfc79a..775d78ecf649d 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala @@ -59,7 +59,8 @@ class DriverKubernetesCredentialsFeatureStepSuite extends SparkFunSuite with Bef Map.empty, Map.empty, Map.empty, - Map.empty) + Map.empty, + Seq.empty[String]) val kubernetesCredentialsStep = new DriverKubernetesCredentialsFeatureStep(kubernetesConf) assert(kubernetesCredentialsStep.configurePod(BASE_DRIVER_POD) === BASE_DRIVER_POD) assert(kubernetesCredentialsStep.getAdditionalPodSystemProperties().isEmpty) @@ -88,7 +89,8 @@ class DriverKubernetesCredentialsFeatureStepSuite extends SparkFunSuite with Bef Map.empty, Map.empty, Map.empty, - Map.empty) + Map.empty, + Seq.empty[String]) val kubernetesCredentialsStep = new DriverKubernetesCredentialsFeatureStep(kubernetesConf) assert(kubernetesCredentialsStep.configurePod(BASE_DRIVER_POD) === BASE_DRIVER_POD) @@ -124,7 +126,8 @@ class DriverKubernetesCredentialsFeatureStepSuite extends SparkFunSuite with Bef Map.empty, Map.empty, Map.empty, - Map.empty) + Map.empty, + Seq.empty[String]) val kubernetesCredentialsStep = new DriverKubernetesCredentialsFeatureStep(kubernetesConf) val resolvedProperties = kubernetesCredentialsStep.getAdditionalPodSystemProperties() val expectedSparkConf = Map( diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala index c299d56865ec0..498730d5c6d40 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala @@ -65,7 +65,8 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { DRIVER_LABELS, Map.empty, Map.empty, - Map.empty)) + Map.empty, + Seq.empty[String])) assert(configurationStep.configurePod(SparkPod.initialPod()) === SparkPod.initialPod()) assert(configurationStep.getAdditionalKubernetesResources().size === 1) assert(configurationStep.getAdditionalKubernetesResources().head.isInstanceOf[Service]) @@ -94,7 +95,8 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { DRIVER_LABELS, Map.empty, Map.empty, - Map.empty)) + Map.empty, + Seq.empty[String])) val expectedServiceName = SHORT_RESOURCE_NAME_PREFIX + DriverServiceFeatureStep.DRIVER_SVC_POSTFIX val expectedHostName = s"$expectedServiceName.my-namespace.svc" @@ -113,7 +115,8 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { DRIVER_LABELS, Map.empty, Map.empty, - Map.empty)) + Map.empty, + Seq.empty[String])) val resolvedService = configurationStep .getAdditionalKubernetesResources() .head @@ -141,7 +144,8 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { DRIVER_LABELS, Map.empty, Map.empty, - Map.empty), + Map.empty, + Seq.empty[String]), clock) val driverService = configurationStep .getAdditionalKubernetesResources() @@ -166,7 +170,8 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { DRIVER_LABELS, Map.empty, Map.empty, - Map.empty), + Map.empty, + Seq.empty[String]), clock) fail("The driver bind address should not be allowed.") } catch { @@ -189,7 +194,8 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { DRIVER_LABELS, Map.empty, Map.empty, - Map.empty), + Map.empty, + Seq.empty[String]), clock) fail("The driver host address should not be allowed.") } catch { diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala index 9d02f56cc206d..847a313b619b5 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala @@ -41,7 +41,8 @@ class MountSecretsFeatureStepSuite extends SparkFunSuite { Map.empty, Map.empty, secretNamesToMountPaths, - Map.empty) + Map.empty, + Seq.empty[String]) val step = new MountSecretsFeatureStep(kubernetesConf) val driverPodWithSecretsMounted = step.configurePod(baseDriverPod).pod diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStepSuite.scala new file mode 100644 index 0000000000000..f84d5e2adb35f --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStepSuite.scala @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.features.bindings + +import scala.collection.JavaConverters._ + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, SparkPod} +import org.apache.spark.deploy.k8s.submit.PythonMainAppResource +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ + +class PythonDriverFeatureStepSuite extends SparkFunSuite { + + + test("Python Step modifies container correctly") { + val expectedMainResource = "/main.py" + val mainResource = "local:///main.py" + val pyFiles = Seq("local:///example2.py", "local:///example3.py") + val expectedPySparkFiles = + "/example2.py,/example3.py" + val baseDriverPod = SparkPod.initialPod() + val sparkConf = new SparkConf(false) + .set(KUBERNETES_PYSPARK_MAIN_APP_RESOURCE, mainResource) + .set(KUBERNETES_PYSPARK_PY_FILES, pyFiles.mkString(",")) + .set("spark.files", "local:///example.py") + .set(KUBERNETES_PYSPARK_APP_ARGS, "5 7") + val kubernetesConf = KubernetesConf( + sparkConf, + KubernetesDriverSpecificConf( + Some(PythonMainAppResource("local:///main.py")), + "test-app", + "python-runner", + Seq.empty[String]), + "", + "", + Map.empty, + Map.empty, + Map.empty, + Map.empty, + Seq.empty[String]) + + val step = new PythonDriverFeatureStep(kubernetesConf) + val driverPod = step.configurePod(baseDriverPod).pod + val driverContainerwithPySpark = step.configurePod(baseDriverPod).container + assert(driverContainerwithPySpark.getEnv.size === 3) + val envs = driverContainerwithPySpark + .getEnv + .asScala + .map(env => (env.getName, env.getValue)) + .toMap + assert(envs(ENV_PYSPARK_PRIMARY) === expectedMainResource) + assert(envs(ENV_PYSPARK_FILES) === expectedPySparkFiles) + assert(envs(ENV_PYSPARK_ARGS) === "5 7") + } + test("Python Step testing empty pyfiles") { + val mainResource = "local:///main.py" + val baseDriverPod = SparkPod.initialPod() + val sparkConf = new SparkConf(false) + .set(KUBERNETES_PYSPARK_MAIN_APP_RESOURCE, mainResource) + val kubernetesConf = KubernetesConf( + sparkConf, + KubernetesDriverSpecificConf( + Some(PythonMainAppResource("local:///main.py")), + "test-app", + "python-runner", + Seq.empty[String]), + "", + "", + Map.empty, + Map.empty, + Map.empty, + Map.empty, + Seq.empty[String]) + val step = new PythonDriverFeatureStep(kubernetesConf) + val driverPod = step.configurePod(baseDriverPod).pod + val driverContainerwithPySpark = step.configurePod(baseDriverPod).container + assert(driverContainerwithPySpark.getEnv.size === 3) + val envs = driverContainerwithPySpark + .getEnv + .asScala + .map(env => (env.getName, env.getValue)) + .toMap + assert(envs(ENV_PYSPARK_FILES) === "") + assert(envs(ENV_PYSPARK_ARGS) === "") + } +} diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala index c1b203e03a357..d66f7ef2e0b27 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala @@ -142,7 +142,8 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter { Map.empty, Map.empty, Map.empty, - Map.empty) + Map.empty, + Seq.empty[String]) when(driverBuilder.buildFromFeatures(kubernetesConf)).thenReturn(BUILT_KUBERNETES_SPEC) when(kubernetesClient.pods()).thenReturn(podOperations) when(podOperations.withName(POD_NAME)).thenReturn(namedPods) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala index 161f9afe7bba9..f0b1b1b82b7a3 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.deploy.k8s.submit import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpec, KubernetesDriverSpecificConf} import org.apache.spark.deploy.k8s.features.{BasicDriverFeatureStep, DriverKubernetesCredentialsFeatureStep, DriverServiceFeatureStep, KubernetesFeaturesTestUtils, MountSecretsFeatureStep} +import org.apache.spark.deploy.k8s.features.bindings.PythonDriverFeatureStep class KubernetesDriverBuilderSuite extends SparkFunSuite { @@ -26,6 +27,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { private val CREDENTIALS_STEP_TYPE = "credentials" private val SERVICE_STEP_TYPE = "service" private val SECRETS_STEP_TYPE = "mount-secrets" + private val PYSPARK_STEP_TYPE = "pyspark-bindings" private val basicFeatureStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( BASIC_STEP_TYPE, classOf[BasicDriverFeatureStep]) @@ -39,12 +41,16 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { private val secretsStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( SECRETS_STEP_TYPE, classOf[MountSecretsFeatureStep]) + private val pythonStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( + PYSPARK_STEP_TYPE, classOf[PythonDriverFeatureStep]) + private val builderUnderTest: KubernetesDriverBuilder = new KubernetesDriverBuilder( _ => basicFeatureStep, _ => credentialsStep, _ => serviceStep, - _ => secretsStep) + _ => secretsStep, + _ => pythonStep) test("Apply fundamental steps all the time.") { val conf = KubernetesConf( @@ -59,7 +65,8 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { Map.empty, Map.empty, Map.empty, - Map.empty) + Map.empty, + Seq.empty[String]) validateStepTypesApplied( builderUnderTest.buildFromFeatures(conf), BASIC_STEP_TYPE, @@ -80,7 +87,8 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { Map.empty, Map.empty, Map("secret" -> "secretMountPath"), - Map.empty) + Map.empty, + Seq.empty[String]) validateStepTypesApplied( builderUnderTest.buildFromFeatures(conf), BASIC_STEP_TYPE, @@ -89,6 +97,29 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { SECRETS_STEP_TYPE) } + test("Apply Python step if main resource is python.") { + val conf = KubernetesConf( + new SparkConf(false), + KubernetesDriverSpecificConf( + Some(PythonMainAppResource("example.py")), + "test-app", + "main", + Seq.empty), + "prefix", + "appId", + Map.empty, + Map.empty, + Map.empty, + Map.empty, + Seq.empty[String]) + validateStepTypesApplied( + builderUnderTest.buildFromFeatures(conf), + BASIC_STEP_TYPE, + CREDENTIALS_STEP_TYPE, + SERVICE_STEP_TYPE, + PYSPARK_STEP_TYPE) + } + private def validateStepTypesApplied(resolvedSpec: KubernetesDriverSpec, stepTypes: String*) : Unit = { assert(resolvedSpec.systemProperties.size === stepTypes.size) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala index f5270623f8acc..7878e0e91e45a 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala @@ -45,7 +45,8 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite { Map.empty, Map.empty, Map.empty, - Map.empty) + Map.empty, + Seq.empty[String]) validateStepTypesApplied(builderUnderTest.buildFromFeatures(conf), BASIC_STEP_TYPE) } @@ -59,7 +60,8 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite { Map.empty, Map.empty, Map("secret" -> "secretMountPath"), - Map.empty) + Map.empty, + Seq.empty[String]) validateStepTypesApplied( builderUnderTest.buildFromFeatures(conf), BASIC_STEP_TYPE, diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/bindings/python/Dockerfile b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/bindings/python/Dockerfile new file mode 100644 index 0000000000000..a36dc87dd8027 --- /dev/null +++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/bindings/python/Dockerfile @@ -0,0 +1,33 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +ARG base_img +FROM $base_img +WORKDIR / +COPY python /opt/spark/python +RUN apk add --no-cache python && \ + python -m ensurepip && \ + rm -r /usr/lib/python*/ensurepip && \ + pip install --upgrade pip setuptools && \ + rm -r /root/.cache +ENV PYTHON_VERSION 2.7.13 +ENV PYSPARK_PYTHON python +ENV PYSPARK_DRIVER_PYTHON python +ENV PYTHONPATH ${SPARK_HOME}/python/:${SPARK_HOME}/python/lib/py4j-0.10.6-src.zip:${PYTHONPATH} + +WORKDIR /opt/spark/work-dir +ENTRYPOINT [ "/opt/entrypoint.sh" ] diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh index 3e166116aa3fd..27840bfe635e5 100755 --- a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh +++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh @@ -53,6 +53,13 @@ if [ -n "$SPARK_MOUNTED_FILES_DIR" ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." . fi +PYSPARK_SECONDARY="$PYSPARK_APP_ARGS" +if [ ! -z "$PYSPARK_FILES" ]; then + PYSPARK_SECONDARY="$PYSPARK_FILES $PYSPARK_APP_ARGS" +fi + + + case "$SPARK_K8S_CMD" in driver) CMD=( @@ -62,6 +69,14 @@ case "$SPARK_K8S_CMD" in "$@" ) ;; + driver-py) + CMD=( + "$SPARK_HOME/bin/spark-submit" + --conf "spark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS" + --deploy-mode client + "$@" $PYSPARK_PRIMARY $PYSPARK_SECONDARY + ) + ;; executor) CMD=(