diff --git a/bin/docker-image-tool.sh b/bin/docker-image-tool.sh
index f090240065bf1..a871ab5d448c3 100755
--- a/bin/docker-image-tool.sh
+++ b/bin/docker-image-tool.sh
@@ -63,12 +63,20 @@ function build {
if [ ! -d "$IMG_PATH" ]; then
error "Cannot find docker image. This script must be run from a runnable distribution of Apache Spark."
fi
-
- local DOCKERFILE=${DOCKERFILE:-"$IMG_PATH/spark/Dockerfile"}
+ local BINDING_BUILD_ARGS=(
+ --build-arg
+ base_img=$(image_ref spark)
+ )
+ local BASEDOCKERFILE=${BASEDOCKERFILE:-"$IMG_PATH/spark/Dockerfile"}
+ local PYDOCKERFILE=${PYDOCKERFILE:-"$IMG_PATH/spark/bindings/python/Dockerfile"}
docker build "${BUILD_ARGS[@]}" \
-t $(image_ref spark) \
- -f "$DOCKERFILE" .
+ -f "$BASEDOCKERFILE" .
+
+ docker build "${BINDING_BUILD_ARGS[@]}" \
+ -t $(image_ref spark-py) \
+ -f "$PYDOCKERFILE" .
}
function push {
@@ -86,7 +94,8 @@ Commands:
push Push a pre-built image to a registry. Requires a repository address to be provided.
Options:
- -f file Dockerfile to build. By default builds the Dockerfile shipped with Spark.
+ -f file Dockerfile to build for JVM based Jobs. By default builds the Dockerfile shipped with Spark.
+ -p file Dockerfile with Python baked in. By default builds the Dockerfile shipped with Spark.
-r repo Repository address.
-t tag Tag to apply to the built image, or to identify the image to be pushed.
-m Use minikube's Docker daemon.
@@ -116,12 +125,14 @@ fi
REPO=
TAG=
-DOCKERFILE=
+BASEDOCKERFILE=
+PYDOCKERFILE=
while getopts f:mr:t: option
do
case "${option}"
in
- f) DOCKERFILE=${OPTARG};;
+ f) BASEDOCKERFILE=${OPTARG};;
+ p) PYDOCKERFILE=${OPTARG};;
r) REPO=${OPTARG};;
t) TAG=${OPTARG};;
m)
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index a46af26feb061..e83d82f847c61 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -285,8 +285,6 @@ private[spark] class SparkSubmit extends Logging {
case (STANDALONE, CLUSTER) if args.isR =>
error("Cluster deploy mode is currently not supported for R " +
"applications on standalone clusters.")
- case (KUBERNETES, _) if args.isPython =>
- error("Python applications are currently not supported for Kubernetes.")
case (KUBERNETES, _) if args.isR =>
error("R applications are currently not supported for Kubernetes.")
case (LOCAL, CLUSTER) =>
@@ -694,9 +692,17 @@ private[spark] class SparkSubmit extends Logging {
if (isKubernetesCluster) {
childMainClass = KUBERNETES_CLUSTER_SUBMIT_CLASS
if (args.primaryResource != SparkLauncher.NO_RESOURCE) {
- childArgs ++= Array("--primary-java-resource", args.primaryResource)
+ if (args.isPython) {
+ childArgs ++= Array("--primary-py-file", args.primaryResource)
+ childArgs ++= Array("--main-class", "org.apache.spark.deploy.PythonRunner")
+ if (args.pyFiles != null) {
+ childArgs ++= Array("--other-py-files", args.pyFiles)
+ }
+ } else {
+ childArgs ++= Array("--primary-java-resource", args.primaryResource)
+ childArgs ++= Array("--main-class", args.mainClass)
+ }
}
- childArgs ++= Array("--main-class", args.mainClass)
if (args.childArgs != null) {
args.childArgs.foreach { arg =>
childArgs += ("--arg", arg)
diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md
index a4b2b98b0b649..23ef934a5fb30 100644
--- a/docs/running-on-kubernetes.md
+++ b/docs/running-on-kubernetes.md
@@ -270,7 +270,6 @@ future versions of the spark-kubernetes integration.
Some of these include:
-* PySpark
* R
* Dynamic Executor Scaling
* Local File Dependency Management
@@ -624,4 +623,19 @@ specific to Spark on Kubernetes.
spark.kubernetes.executor.secrets.ENV_VAR=spark-secret:key.
+
+ spark.kubernetes.memoryOverheadFactor |
+ 0.1 |
+
+ This sets the Memory Overhead Factor that will allocate memory to non-JVM memory, which includes off-heap memory allocations, non-JVM tasks, and various systems processes. For JVM-based jobs this value will default to 0.10 and 0.40 for non-JVM jobs.
+ This is done as non-JVM tasks need more non-JVM heap space and such tasks commonly fail with "Memory Overhead Exceeded" errors. This prempts this error with a higher default.
+ |
+
+
+ spark.kubernetes.pyspark.pythonversion |
+ "2" |
+
+ This sets the major Python version of the docker image used to run the driver and executor containers. Can either be 2 or 3.
+ |
+
diff --git a/examples/src/main/python/py_container_checks.py b/examples/src/main/python/py_container_checks.py
new file mode 100644
index 0000000000000..f6b3be2806c82
--- /dev/null
+++ b/examples/src/main/python/py_container_checks.py
@@ -0,0 +1,32 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+
+
+def version_check(python_env, major_python_version):
+ """
+ These are various tests to test the Python container image.
+ This file will be distributed via --py-files in the e2e tests.
+ """
+ env_version = os.environ.get('PYSPARK_PYTHON')
+ print("Python runtime version check is: " +
+ str(sys.version_info[0] == major_python_version))
+
+ print("Python environment version check is: " +
+ str(env_version == python_env))
diff --git a/examples/src/main/python/pyfiles.py b/examples/src/main/python/pyfiles.py
new file mode 100644
index 0000000000000..4193654b49a12
--- /dev/null
+++ b/examples/src/main/python/pyfiles.py
@@ -0,0 +1,38 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import print_function
+
+import sys
+
+from pyspark.sql import SparkSession
+
+
+if __name__ == "__main__":
+ """
+ Usage: pyfiles [major_python_version]
+ """
+ spark = SparkSession \
+ .builder \
+ .appName("PyFilesTest") \
+ .getOrCreate()
+
+ from py_container_checks import version_check
+ # Begin of Python container checks
+ version_check(sys.argv[1], 2 if sys.argv[1] == "python" else 3)
+
+ spark.stop()
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index 560dedf431b08..590deaa72e7ee 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -117,6 +117,28 @@ private[spark] object Config extends Logging {
.stringConf
.createWithDefault("spark")
+ val KUBERNETES_PYSPARK_PY_FILES =
+ ConfigBuilder("spark.kubernetes.python.pyFiles")
+ .doc("The PyFiles that are distributed via client arguments")
+ .internal()
+ .stringConf
+ .createOptional
+
+ val KUBERNETES_PYSPARK_MAIN_APP_RESOURCE =
+ ConfigBuilder("spark.kubernetes.python.mainAppResource")
+ .doc("The main app resource for pyspark jobs")
+ .internal()
+ .stringConf
+ .createOptional
+
+ val KUBERNETES_PYSPARK_APP_ARGS =
+ ConfigBuilder("spark.kubernetes.python.appArgs")
+ .doc("The app arguments for PySpark Jobs")
+ .internal()
+ .stringConf
+ .createOptional
+
+
val KUBERNETES_ALLOCATION_BATCH_SIZE =
ConfigBuilder("spark.kubernetes.allocation.batch.size")
.doc("Number of pods to launch at once in each round of executor allocation.")
@@ -154,6 +176,24 @@ private[spark] object Config extends Logging {
.checkValue(interval => interval > 0, s"Logging interval must be a positive time value.")
.createWithDefaultString("1s")
+ val MEMORY_OVERHEAD_FACTOR =
+ ConfigBuilder("spark.kubernetes.memoryOverheadFactor")
+ .doc("This sets the Memory Overhead Factor that will allocate memory to non-JVM jobs " +
+ "which in the case of JVM tasks will default to 0.10 and 0.40 for non-JVM jobs")
+ .doubleConf
+ .checkValue(mem_overhead => mem_overhead >= 0 && mem_overhead < 1,
+ "Ensure that memory overhead is a double between 0 --> 1.0")
+ .createWithDefault(0.1)
+
+ val PYSPARK_MAJOR_PYTHON_VERSION =
+ ConfigBuilder("spark.kubernetes.pyspark.pythonversion")
+ .doc("This sets the major Python version. Either 2 or 3. (Python2 or Python3)")
+ .stringConf
+ .checkValue(pv => List("2", "3").contains(pv),
+ "Ensure that major Python version is either Python2 or Python3")
+ .createWithDefault("2")
+
+
val KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX =
"spark.kubernetes.authenticate.submission"
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
index 8da5f24044aad..69bd03d1eda6f 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
@@ -71,9 +71,14 @@ private[spark] object Constants {
val SPARK_CONF_FILE_NAME = "spark.properties"
val SPARK_CONF_PATH = s"$SPARK_CONF_DIR_INTERNAL/$SPARK_CONF_FILE_NAME"
+ // BINDINGS
+ val ENV_PYSPARK_PRIMARY = "PYSPARK_PRIMARY"
+ val ENV_PYSPARK_FILES = "PYSPARK_FILES"
+ val ENV_PYSPARK_ARGS = "PYSPARK_APP_ARGS"
+ val ENV_PYSPARK_MAJOR_PYTHON_VERSION = "PYSPARK_MAJOR_PYTHON_VERSION"
+
// Miscellaneous
val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc"
val DRIVER_CONTAINER_NAME = "spark-kubernetes-driver"
- val MEMORY_OVERHEAD_FACTOR = 0.10
val MEMORY_OVERHEAD_MIN_MIB = 384L
}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
index 5a944187a7096..b0ccaa36b01ed 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
@@ -16,14 +16,17 @@
*/
package org.apache.spark.deploy.k8s
+import scala.collection.mutable
+
import io.fabric8.kubernetes.api.model.{LocalObjectReference, LocalObjectReferenceBuilder, Pod}
import org.apache.spark.SparkConf
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
-import org.apache.spark.deploy.k8s.submit.{JavaMainAppResource, MainAppResource}
+import org.apache.spark.deploy.k8s.submit._
import org.apache.spark.internal.config.ConfigEntry
+
private[spark] sealed trait KubernetesRoleSpecificConf
/*
@@ -55,7 +58,8 @@ private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf](
roleAnnotations: Map[String, String],
roleSecretNamesToMountPaths: Map[String, String],
roleSecretEnvNamesToKeyRefs: Map[String, String],
- roleEnvs: Map[String, String]) {
+ roleEnvs: Map[String, String],
+ sparkFiles: Seq[String]) {
def namespace(): String = sparkConf.get(KUBERNETES_NAMESPACE)
@@ -64,10 +68,14 @@ private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf](
.map(str => str.split(",").toSeq)
.getOrElse(Seq.empty[String])
- def sparkFiles(): Seq[String] = sparkConf
- .getOption("spark.files")
- .map(str => str.split(",").toSeq)
- .getOrElse(Seq.empty[String])
+ def pyFiles(): Option[String] = sparkConf
+ .get(KUBERNETES_PYSPARK_PY_FILES)
+
+ def pySparkMainResource(): Option[String] = sparkConf
+ .get(KUBERNETES_PYSPARK_MAIN_APP_RESOURCE)
+
+ def pySparkPythonVersion(): String = sparkConf
+ .get(PYSPARK_MAJOR_PYTHON_VERSION)
def imagePullPolicy(): String = sparkConf.get(CONTAINER_IMAGE_PULL_POLICY)
@@ -102,17 +110,30 @@ private[spark] object KubernetesConf {
appId: String,
mainAppResource: Option[MainAppResource],
mainClass: String,
- appArgs: Array[String]): KubernetesConf[KubernetesDriverSpecificConf] = {
+ appArgs: Array[String],
+ maybePyFiles: Option[String]): KubernetesConf[KubernetesDriverSpecificConf] = {
val sparkConfWithMainAppJar = sparkConf.clone()
+ val additionalFiles = mutable.ArrayBuffer.empty[String]
mainAppResource.foreach {
- case JavaMainAppResource(res) =>
- val previousJars = sparkConf
- .getOption("spark.jars")
- .map(_.split(","))
- .getOrElse(Array.empty)
- if (!previousJars.contains(res)) {
- sparkConfWithMainAppJar.setJars(previousJars ++ Seq(res))
- }
+ case JavaMainAppResource(res) =>
+ val previousJars = sparkConf
+ .getOption("spark.jars")
+ .map(_.split(","))
+ .getOrElse(Array.empty)
+ if (!previousJars.contains(res)) {
+ sparkConfWithMainAppJar.setJars(previousJars ++ Seq(res))
+ }
+ // The function of this outer match is to account for multiple nonJVM
+ // bindings that will all have increased MEMORY_OVERHEAD_FACTOR to 0.4
+ case nonJVM: NonJVMResource =>
+ nonJVM match {
+ case PythonMainAppResource(res) =>
+ additionalFiles += res
+ maybePyFiles.foreach{maybePyFiles =>
+ additionalFiles.appendAll(maybePyFiles.split(","))}
+ sparkConfWithMainAppJar.set(KUBERNETES_PYSPARK_MAIN_APP_RESOURCE, res)
+ }
+ sparkConfWithMainAppJar.setIfMissing(MEMORY_OVERHEAD_FACTOR, 0.4)
}
val driverCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
@@ -135,6 +156,11 @@ private[spark] object KubernetesConf {
val driverEnvs = KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf, KUBERNETES_DRIVER_ENV_PREFIX)
+ val sparkFiles = sparkConf
+ .getOption("spark.files")
+ .map(str => str.split(",").toSeq)
+ .getOrElse(Seq.empty[String]) ++ additionalFiles
+
KubernetesConf(
sparkConfWithMainAppJar,
KubernetesDriverSpecificConf(mainAppResource, mainClass, appName, appArgs),
@@ -144,7 +170,8 @@ private[spark] object KubernetesConf {
driverAnnotations,
driverSecretNamesToMountPaths,
driverSecretEnvNamesToKeyRefs,
- driverEnvs)
+ driverEnvs,
+ sparkFiles)
}
def createExecutorConf(
@@ -186,6 +213,7 @@ private[spark] object KubernetesConf {
executorAnnotations,
executorMountSecrets,
executorEnvSecrets,
- executorEnv)
+ executorEnv,
+ Seq.empty[String])
}
}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
index ee629068ad90d..593fb531a004d 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
@@ -52,7 +52,7 @@ private[spark] object KubernetesUtils {
}
}
- private def resolveFileUri(uri: String): String = {
+ def resolveFileUri(uri: String): String = {
val fileUri = Utils.resolveURI(uri)
val fileScheme = Option(fileUri.getScheme).getOrElse("file")
fileScheme match {
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
index 07bdccbe0479d..143dc8a12304e 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
@@ -25,8 +25,8 @@ import org.apache.spark.SparkException
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, KubernetesUtils, SparkPod}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit._
import org.apache.spark.internal.config._
-import org.apache.spark.launcher.SparkLauncher
private[spark] class BasicDriverFeatureStep(
conf: KubernetesConf[KubernetesDriverSpecificConf])
@@ -48,7 +48,8 @@ private[spark] class BasicDriverFeatureStep(
private val driverMemoryMiB = conf.get(DRIVER_MEMORY)
private val memoryOverheadMiB = conf
.get(DRIVER_MEMORY_OVERHEAD)
- .getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * driverMemoryMiB).toInt, MEMORY_OVERHEAD_MIN_MIB))
+ .getOrElse(math.max((conf.get(MEMORY_OVERHEAD_FACTOR) * driverMemoryMiB).toInt,
+ MEMORY_OVERHEAD_MIN_MIB))
private val driverMemoryWithOverheadMiB = driverMemoryMiB + memoryOverheadMiB
override def configurePod(pod: SparkPod): SparkPod = {
@@ -88,13 +89,6 @@ private[spark] class BasicDriverFeatureStep(
.addToRequests("memory", driverMemoryQuantity)
.addToLimits("memory", driverMemoryQuantity)
.endResources()
- .addToArgs("driver")
- .addToArgs("--properties-file", SPARK_CONF_PATH)
- .addToArgs("--class", conf.roleSpecificConf.mainClass)
- // The user application jar is merged into the spark.jars list and managed through that
- // property, so there is no need to reference it explicitly here.
- .addToArgs(SparkLauncher.NO_RESOURCE)
- .addToArgs(conf.roleSpecificConf.appArgs: _*)
.build()
val driverPod = new PodBuilder(pod.pod)
@@ -122,7 +116,7 @@ private[spark] class BasicDriverFeatureStep(
val resolvedSparkJars = KubernetesUtils.resolveFileUrisAndPath(
conf.sparkJars())
val resolvedSparkFiles = KubernetesUtils.resolveFileUrisAndPath(
- conf.sparkFiles())
+ conf.sparkFiles)
if (resolvedSparkJars.nonEmpty) {
additionalProps.put("spark.jars", resolvedSparkJars.mkString(","))
}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
index 529069d3b8a0c..91c54a9776982 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
@@ -54,7 +54,8 @@ private[spark] class BasicExecutorFeatureStep(
private val memoryOverheadMiB = kubernetesConf
.get(EXECUTOR_MEMORY_OVERHEAD)
- .getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMiB).toInt,
+ .getOrElse(math.max(
+ (kubernetesConf.get(MEMORY_OVERHEAD_FACTOR) * executorMemoryMiB).toInt,
MEMORY_OVERHEAD_MIN_MIB))
private val executorMemoryWithOverhead = executorMemoryMiB + memoryOverheadMiB
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStep.scala
new file mode 100644
index 0000000000000..f52ec9fdc677e
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStep.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features.bindings
+
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata}
+
+import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, SparkPod}
+import org.apache.spark.deploy.k8s.Constants.SPARK_CONF_PATH
+import org.apache.spark.deploy.k8s.features.KubernetesFeatureConfigStep
+import org.apache.spark.launcher.SparkLauncher
+
+private[spark] class JavaDriverFeatureStep(
+ kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf])
+ extends KubernetesFeatureConfigStep {
+ override def configurePod(pod: SparkPod): SparkPod = {
+ val withDriverArgs = new ContainerBuilder(pod.container)
+ .addToArgs("driver")
+ .addToArgs("--properties-file", SPARK_CONF_PATH)
+ .addToArgs("--class", kubernetesConf.roleSpecificConf.mainClass)
+ // The user application jar is merged into the spark.jars list and managed through that
+ // property, so there is no need to reference it explicitly here.
+ .addToArgs(SparkLauncher.NO_RESOURCE)
+ .addToArgs(kubernetesConf.roleSpecificConf.appArgs: _*)
+ .build()
+ SparkPod(pod.pod, withDriverArgs)
+ }
+ override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty
+
+ override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty
+}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStep.scala
new file mode 100644
index 0000000000000..c20bcac1f8987
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStep.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features.bindings
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, HasMetadata}
+
+import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, KubernetesUtils, SparkPod}
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.features.KubernetesFeatureConfigStep
+
+private[spark] class PythonDriverFeatureStep(
+ kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf])
+ extends KubernetesFeatureConfigStep {
+ override def configurePod(pod: SparkPod): SparkPod = {
+ val roleConf = kubernetesConf.roleSpecificConf
+ require(roleConf.mainAppResource.isDefined, "PySpark Main Resource must be defined")
+ val maybePythonArgs = Option(roleConf.appArgs).filter(_.nonEmpty).map(
+ pyArgs =>
+ new EnvVarBuilder()
+ .withName(ENV_PYSPARK_ARGS)
+ .withValue(pyArgs.mkString(","))
+ .build())
+ val maybePythonFiles = kubernetesConf.pyFiles().map(
+ // Dilineation by ":" is to append the PySpark Files to the PYTHONPATH
+ // of the respective PySpark pod
+ pyFiles =>
+ new EnvVarBuilder()
+ .withName(ENV_PYSPARK_FILES)
+ .withValue(KubernetesUtils.resolveFileUrisAndPath(pyFiles.split(","))
+ .mkString(":"))
+ .build())
+ val envSeq =
+ Seq(new EnvVarBuilder()
+ .withName(ENV_PYSPARK_PRIMARY)
+ .withValue(KubernetesUtils.resolveFileUri(kubernetesConf.pySparkMainResource().get))
+ .build(),
+ new EnvVarBuilder()
+ .withName(ENV_PYSPARK_MAJOR_PYTHON_VERSION)
+ .withValue(kubernetesConf.pySparkPythonVersion())
+ .build())
+ val pythonEnvs = envSeq ++
+ maybePythonArgs.toSeq ++
+ maybePythonFiles.toSeq
+
+ val withPythonPrimaryContainer = new ContainerBuilder(pod.container)
+ .addAllToEnv(pythonEnvs.asJava)
+ .addToArgs("driver-py")
+ .addToArgs("--properties-file", SPARK_CONF_PATH)
+ .addToArgs("--class", roleConf.mainClass)
+ .build()
+
+ SparkPod(pod.pod, withPythonPrimaryContainer)
+ }
+ override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty
+
+ override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty
+}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
index a97f5650fb869..eaff47205dbbc 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
@@ -39,11 +39,13 @@ import org.apache.spark.util.Utils
* @param mainAppResource the main application resource if any
* @param mainClass the main class of the application to run
* @param driverArgs arguments to the driver
+ * @param maybePyFiles additional Python files via --py-files
*/
private[spark] case class ClientArguments(
mainAppResource: Option[MainAppResource],
mainClass: String,
- driverArgs: Array[String])
+ driverArgs: Array[String],
+ maybePyFiles: Option[String])
private[spark] object ClientArguments {
@@ -51,10 +53,15 @@ private[spark] object ClientArguments {
var mainAppResource: Option[MainAppResource] = None
var mainClass: Option[String] = None
val driverArgs = mutable.ArrayBuffer.empty[String]
+ var maybePyFiles : Option[String] = None
args.sliding(2, 2).toList.foreach {
case Array("--primary-java-resource", primaryJavaResource: String) =>
mainAppResource = Some(JavaMainAppResource(primaryJavaResource))
+ case Array("--primary-py-file", primaryPythonResource: String) =>
+ mainAppResource = Some(PythonMainAppResource(primaryPythonResource))
+ case Array("--other-py-files", pyFiles: String) =>
+ maybePyFiles = Some(pyFiles)
case Array("--main-class", clazz: String) =>
mainClass = Some(clazz)
case Array("--arg", arg: String) =>
@@ -69,7 +76,8 @@ private[spark] object ClientArguments {
ClientArguments(
mainAppResource,
mainClass.get,
- driverArgs.toArray)
+ driverArgs.toArray,
+ maybePyFiles)
}
}
@@ -206,6 +214,7 @@ private[spark] class KubernetesClientApplication extends SparkApplication {
val kubernetesResourceNamePrefix = {
s"$appName-$launchTime".toLowerCase.replaceAll("\\.", "-")
}
+ sparkConf.set(KUBERNETES_PYSPARK_PY_FILES, clientArguments.maybePyFiles.getOrElse(""))
val kubernetesConf = KubernetesConf.createDriverConf(
sparkConf,
appName,
@@ -213,7 +222,8 @@ private[spark] class KubernetesClientApplication extends SparkApplication {
kubernetesAppId,
clientArguments.mainAppResource,
clientArguments.mainClass,
- clientArguments.driverArgs)
+ clientArguments.driverArgs,
+ clientArguments.maybePyFiles)
val builder = new KubernetesDriverBuilder
val namespace = kubernetesConf.namespace()
// The master URL has been checked for validity already in SparkSubmit.
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
index fdc5eb0d75832..5762d8245f778 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
@@ -17,7 +17,8 @@
package org.apache.spark.deploy.k8s.submit
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpec, KubernetesDriverSpecificConf, KubernetesRoleSpecificConf}
-import org.apache.spark.deploy.k8s.features._
+import org.apache.spark.deploy.k8s.features.{BasicDriverFeatureStep, DriverKubernetesCredentialsFeatureStep, DriverServiceFeatureStep, EnvSecretsFeatureStep, KubernetesFeatureConfigStep, LocalDirsFeatureStep, MountSecretsFeatureStep}
+import org.apache.spark.deploy.k8s.features.bindings.{JavaDriverFeatureStep, PythonDriverFeatureStep}
private[spark] class KubernetesDriverBuilder(
provideBasicStep: (KubernetesConf[KubernetesDriverSpecificConf]) => BasicDriverFeatureStep =
@@ -33,9 +34,17 @@ private[spark] class KubernetesDriverBuilder(
provideEnvSecretsStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf]
=> EnvSecretsFeatureStep) =
new EnvSecretsFeatureStep(_),
- provideLocalDirsStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf])
- => LocalDirsFeatureStep =
- new LocalDirsFeatureStep(_)) {
+ provideLocalDirsStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf]
+ => LocalDirsFeatureStep) =
+ new LocalDirsFeatureStep(_),
+ provideJavaStep: (
+ KubernetesConf[KubernetesDriverSpecificConf]
+ => JavaDriverFeatureStep) =
+ new JavaDriverFeatureStep(_),
+ providePythonStep: (
+ KubernetesConf[KubernetesDriverSpecificConf]
+ => PythonDriverFeatureStep) =
+ new PythonDriverFeatureStep(_)) {
def buildFromFeatures(
kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf]): KubernetesDriverSpec = {
@@ -44,13 +53,23 @@ private[spark] class KubernetesDriverBuilder(
provideCredentialsStep(kubernetesConf),
provideServiceStep(kubernetesConf),
provideLocalDirsStep(kubernetesConf))
- var allFeatures = if (kubernetesConf.roleSecretNamesToMountPaths.nonEmpty) {
- baseFeatures ++ Seq(provideSecretsStep(kubernetesConf))
- } else baseFeatures
- allFeatures = if (kubernetesConf.roleSecretEnvNamesToKeyRefs.nonEmpty) {
- allFeatures ++ Seq(provideEnvSecretsStep(kubernetesConf))
- } else allFeatures
+ val maybeRoleSecretNamesStep = if (kubernetesConf.roleSecretNamesToMountPaths.nonEmpty) {
+ Some(provideSecretsStep(kubernetesConf)) } else None
+
+ val maybeProvideSecretsStep = if (kubernetesConf.roleSecretEnvNamesToKeyRefs.nonEmpty) {
+ Some(provideEnvSecretsStep(kubernetesConf)) } else None
+
+ val bindingsStep = kubernetesConf.roleSpecificConf.mainAppResource.map {
+ case JavaMainAppResource(_) =>
+ provideJavaStep(kubernetesConf)
+ case PythonMainAppResource(_) =>
+ providePythonStep(kubernetesConf)}.getOrElse(provideJavaStep(kubernetesConf))
+
+ val allFeatures: Seq[KubernetesFeatureConfigStep] =
+ (baseFeatures :+ bindingsStep) ++
+ maybeRoleSecretNamesStep.toSeq ++
+ maybeProvideSecretsStep.toSeq
var spec = KubernetesDriverSpec.initialSpec(kubernetesConf.sparkConf.getAll.toMap)
for (feature <- allFeatures) {
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/MainAppResource.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/MainAppResource.scala
index cca9f4627a1f6..cbe081ae35683 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/MainAppResource.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/MainAppResource.scala
@@ -18,4 +18,9 @@ package org.apache.spark.deploy.k8s.submit
private[spark] sealed trait MainAppResource
+private[spark] sealed trait NonJVMResource
+
private[spark] case class JavaMainAppResource(primaryResource: String) extends MainAppResource
+
+private[spark] case class PythonMainAppResource(primaryResource: String)
+ extends MainAppResource with NonJVMResource
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
index d5e1de36a58df..769a0a5a63047 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
@@ -17,7 +17,7 @@
package org.apache.spark.scheduler.cluster.k8s
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesExecutorSpecificConf, KubernetesRoleSpecificConf, SparkPod}
-import org.apache.spark.deploy.k8s.features.{BasicExecutorFeatureStep, EnvSecretsFeatureStep, LocalDirsFeatureStep, MountSecretsFeatureStep}
+import org.apache.spark.deploy.k8s.features.{BasicExecutorFeatureStep, EnvSecretsFeatureStep, KubernetesFeatureConfigStep, LocalDirsFeatureStep, MountSecretsFeatureStep}
private[spark] class KubernetesExecutorBuilder(
provideBasicStep: (KubernetesConf[KubernetesExecutorSpecificConf]) => BasicExecutorFeatureStep =
@@ -34,14 +34,20 @@ private[spark] class KubernetesExecutorBuilder(
def buildFromFeatures(
kubernetesConf: KubernetesConf[KubernetesExecutorSpecificConf]): SparkPod = {
- val baseFeatures = Seq(provideBasicStep(kubernetesConf), provideLocalDirsStep(kubernetesConf))
- var allFeatures = if (kubernetesConf.roleSecretNamesToMountPaths.nonEmpty) {
- baseFeatures ++ Seq(provideSecretsStep(kubernetesConf))
- } else baseFeatures
+ val baseFeatures = Seq(
+ provideBasicStep(kubernetesConf),
+ provideLocalDirsStep(kubernetesConf))
- allFeatures = if (kubernetesConf.roleSecretEnvNamesToKeyRefs.nonEmpty) {
- allFeatures ++ Seq(provideEnvSecretsStep(kubernetesConf))
- } else allFeatures
+ val maybeRoleSecretNamesStep = if (kubernetesConf.roleSecretNamesToMountPaths.nonEmpty) {
+ Some(provideSecretsStep(kubernetesConf)) } else None
+
+ val maybeProvideSecretsStep = if (kubernetesConf.roleSecretEnvNamesToKeyRefs.nonEmpty) {
+ Some(provideEnvSecretsStep(kubernetesConf)) } else None
+
+ val allFeatures: Seq[KubernetesFeatureConfigStep] =
+ baseFeatures ++
+ maybeRoleSecretNamesStep.toSeq ++
+ maybeProvideSecretsStep.toSeq
var executorPod = SparkPod.initialPod()
for (feature <- allFeatures) {
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala
index 3d23e1cb90fd2..661f942435921 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala
@@ -22,7 +22,7 @@ import io.fabric8.kubernetes.api.model.{LocalObjectReferenceBuilder, PodBuilder}
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
-import org.apache.spark.deploy.k8s.submit.JavaMainAppResource
+import org.apache.spark.deploy.k8s.submit._
class KubernetesConfSuite extends SparkFunSuite {
@@ -56,9 +56,10 @@ class KubernetesConfSuite extends SparkFunSuite {
APP_NAME,
RESOURCE_NAME_PREFIX,
APP_ID,
- None,
+ mainAppResource = None,
MAIN_CLASS,
- APP_ARGS)
+ APP_ARGS,
+ maybePyFiles = None)
assert(conf.appId === APP_ID)
assert(conf.sparkConf.getAll.toMap === sparkConf.getAll.toMap)
assert(conf.appResourceNamePrefix === RESOURCE_NAME_PREFIX)
@@ -79,7 +80,8 @@ class KubernetesConfSuite extends SparkFunSuite {
APP_ID,
mainAppJar,
MAIN_CLASS,
- APP_ARGS)
+ APP_ARGS,
+ maybePyFiles = None)
assert(kubernetesConfWithMainJar.sparkConf.get("spark.jars")
.split(",")
=== Array("local:///opt/spark/jar1.jar", "local:///opt/spark/main.jar"))
@@ -88,15 +90,59 @@ class KubernetesConfSuite extends SparkFunSuite {
APP_NAME,
RESOURCE_NAME_PREFIX,
APP_ID,
- None,
+ mainAppResource = None,
MAIN_CLASS,
- APP_ARGS)
+ APP_ARGS,
+ maybePyFiles = None)
assert(kubernetesConfWithoutMainJar.sparkConf.get("spark.jars").split(",")
=== Array("local:///opt/spark/jar1.jar"))
+ assert(kubernetesConfWithoutMainJar.sparkConf.get(MEMORY_OVERHEAD_FACTOR) === 0.1)
}
- test("Resolve driver labels, annotations, secret mount paths, and envs.") {
+ test("Creating driver conf with a python primary file") {
+ val mainResourceFile = "local:///opt/spark/main.py"
+ val inputPyFiles = Array("local:///opt/spark/example2.py", "local:///example3.py")
val sparkConf = new SparkConf(false)
+ .setJars(Seq("local:///opt/spark/jar1.jar"))
+ .set("spark.files", "local:///opt/spark/example4.py")
+ val mainAppResource = Some(PythonMainAppResource(mainResourceFile))
+ val kubernetesConfWithMainResource = KubernetesConf.createDriverConf(
+ sparkConf,
+ APP_NAME,
+ RESOURCE_NAME_PREFIX,
+ APP_ID,
+ mainAppResource,
+ MAIN_CLASS,
+ APP_ARGS,
+ Some(inputPyFiles.mkString(",")))
+ assert(kubernetesConfWithMainResource.sparkConf.get("spark.jars").split(",")
+ === Array("local:///opt/spark/jar1.jar"))
+ assert(kubernetesConfWithMainResource.sparkConf.get(MEMORY_OVERHEAD_FACTOR) === 0.4)
+ assert(kubernetesConfWithMainResource.sparkFiles
+ === Array("local:///opt/spark/example4.py", mainResourceFile) ++ inputPyFiles)
+ }
+
+ test("Testing explicit setting of memory overhead on non-JVM tasks") {
+ val sparkConf = new SparkConf(false)
+ .set(MEMORY_OVERHEAD_FACTOR, 0.3)
+
+ val mainResourceFile = "local:///opt/spark/main.py"
+ val mainAppResource = Some(PythonMainAppResource(mainResourceFile))
+ val conf = KubernetesConf.createDriverConf(
+ sparkConf,
+ APP_NAME,
+ RESOURCE_NAME_PREFIX,
+ APP_ID,
+ mainAppResource,
+ MAIN_CLASS,
+ APP_ARGS,
+ None)
+ assert(conf.sparkConf.get(MEMORY_OVERHEAD_FACTOR) === 0.3)
+ }
+
+ test("Resolve driver labels, annotations, secret mount paths, envs, and memory overhead") {
+ val sparkConf = new SparkConf(false)
+ .set(MEMORY_OVERHEAD_FACTOR, 0.3)
CUSTOM_LABELS.foreach { case (key, value) =>
sparkConf.set(s"$KUBERNETES_DRIVER_LABEL_PREFIX$key", value)
}
@@ -118,9 +164,10 @@ class KubernetesConfSuite extends SparkFunSuite {
APP_NAME,
RESOURCE_NAME_PREFIX,
APP_ID,
- None,
+ mainAppResource = None,
MAIN_CLASS,
- APP_ARGS)
+ APP_ARGS,
+ maybePyFiles = None)
assert(conf.roleLabels === Map(
SPARK_APP_ID_LABEL -> APP_ID,
SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE) ++
@@ -129,6 +176,7 @@ class KubernetesConfSuite extends SparkFunSuite {
assert(conf.roleSecretNamesToMountPaths === SECRET_NAMES_TO_MOUNT_PATHS)
assert(conf.roleSecretEnvNamesToKeyRefs === SECRET_ENV_VARS)
assert(conf.roleEnvs === CUSTOM_ENVS)
+ assert(conf.sparkConf.get(MEMORY_OVERHEAD_FACTOR) === 0.3)
}
test("Basic executor translated fields.") {
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
index b2813d8b3265d..04b909db9d9f3 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
@@ -24,6 +24,8 @@ import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, SparkPod}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.JavaMainAppResource
+import org.apache.spark.deploy.k8s.submit.PythonMainAppResource
class BasicDriverFeatureStepSuite extends SparkFunSuite {
@@ -33,6 +35,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
private val CONTAINER_IMAGE_PULL_POLICY = "IfNotPresent"
private val APP_NAME = "spark-test"
private val MAIN_CLASS = "org.apache.spark.examples.SparkPi"
+ private val PY_MAIN_CLASS = "org.apache.spark.deploy.PythonRunner"
private val APP_ARGS = Array("arg1", "arg2", "\"arg 3\"")
private val CUSTOM_ANNOTATION_KEY = "customAnnotation"
private val CUSTOM_ANNOTATION_VALUE = "customAnnotationValue"
@@ -60,7 +63,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
val kubernetesConf = KubernetesConf(
sparkConf,
KubernetesDriverSpecificConf(
- None,
+ Some(JavaMainAppResource("")),
APP_NAME,
MAIN_CLASS,
APP_ARGS),
@@ -70,7 +73,8 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
DRIVER_ANNOTATIONS,
Map.empty,
Map.empty,
- DRIVER_ENVS)
+ DRIVER_ENVS,
+ Seq.empty[String])
val featureStep = new BasicDriverFeatureStep(kubernetesConf)
val basePod = SparkPod.initialPod()
@@ -110,7 +114,6 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
assert(driverPodMetadata.getLabels.asScala === DRIVER_LABELS)
assert(driverPodMetadata.getAnnotations.asScala === DRIVER_ANNOTATIONS)
assert(configuredPod.pod.getSpec.getRestartPolicy === "Never")
-
val expectedSparkConf = Map(
KUBERNETES_DRIVER_POD_NAME.key -> "spark-driver-pod",
"spark.app.id" -> APP_ID,
@@ -119,6 +122,50 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
assert(featureStep.getAdditionalPodSystemProperties() === expectedSparkConf)
}
+ test("Check appropriate entrypoint rerouting for various bindings") {
+ val javaSparkConf = new SparkConf()
+ .set(org.apache.spark.internal.config.DRIVER_MEMORY.key, "4g")
+ .set(CONTAINER_IMAGE, "spark-driver:latest")
+ val pythonSparkConf = new SparkConf()
+ .set(org.apache.spark.internal.config.DRIVER_MEMORY.key, "4g")
+ .set(CONTAINER_IMAGE, "spark-driver:latest")
+ val javaKubernetesConf = KubernetesConf(
+ javaSparkConf,
+ KubernetesDriverSpecificConf(
+ Some(JavaMainAppResource("")),
+ APP_NAME,
+ PY_MAIN_CLASS,
+ APP_ARGS),
+ RESOURCE_NAME_PREFIX,
+ APP_ID,
+ DRIVER_LABELS,
+ DRIVER_ANNOTATIONS,
+ Map.empty,
+ Map.empty,
+ DRIVER_ENVS,
+ Seq.empty[String])
+ val pythonKubernetesConf = KubernetesConf(
+ pythonSparkConf,
+ KubernetesDriverSpecificConf(
+ Some(PythonMainAppResource("")),
+ APP_NAME,
+ PY_MAIN_CLASS,
+ APP_ARGS),
+ RESOURCE_NAME_PREFIX,
+ APP_ID,
+ DRIVER_LABELS,
+ DRIVER_ANNOTATIONS,
+ Map.empty,
+ Map.empty,
+ DRIVER_ENVS,
+ Seq.empty[String])
+ val javaFeatureStep = new BasicDriverFeatureStep(javaKubernetesConf)
+ val pythonFeatureStep = new BasicDriverFeatureStep(pythonKubernetesConf)
+ val basePod = SparkPod.initialPod()
+ val configuredJavaPod = javaFeatureStep.configurePod(basePod)
+ val configuredPythonPod = pythonFeatureStep.configurePod(basePod)
+ }
+
test("Additional system properties resolve jars and set cluster-mode confs.") {
val allJars = Seq("local:///opt/spark/jar1.jar", "hdfs:///opt/spark/jar2.jar")
val allFiles = Seq("https://localhost:9000/file1.txt", "local:///opt/spark/file2.txt")
@@ -130,7 +177,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
val kubernetesConf = KubernetesConf(
sparkConf,
KubernetesDriverSpecificConf(
- None,
+ Some(JavaMainAppResource("")),
APP_NAME,
MAIN_CLASS,
APP_ARGS),
@@ -140,7 +187,8 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
DRIVER_ANNOTATIONS,
Map.empty,
Map.empty,
- Map.empty)
+ DRIVER_ENVS,
+ allFiles)
val step = new BasicDriverFeatureStep(kubernetesConf)
val additionalProperties = step.getAdditionalPodSystemProperties()
val expectedSparkConf = Map(
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
index 9182134b3337c..f06030aa55c0c 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
@@ -88,7 +88,8 @@ class BasicExecutorFeatureStepSuite
ANNOTATIONS,
Map.empty,
Map.empty,
- Map.empty))
+ Map.empty,
+ Seq.empty[String]))
val executor = step.configurePod(SparkPod.initialPod())
// The executor pod name and default labels.
@@ -126,7 +127,8 @@ class BasicExecutorFeatureStepSuite
ANNOTATIONS,
Map.empty,
Map.empty,
- Map.empty))
+ Map.empty,
+ Seq.empty[String]))
assert(step.configurePod(SparkPod.initialPod()).pod.getSpec.getHostname.length === 63)
}
@@ -145,7 +147,8 @@ class BasicExecutorFeatureStepSuite
ANNOTATIONS,
Map.empty,
Map.empty,
- Map("qux" -> "quux")))
+ Map("qux" -> "quux"),
+ Seq.empty[String]))
val executor = step.configurePod(SparkPod.initialPod())
checkEnv(executor,
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala
index f81894f8055f1..7cea83591f3e8 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala
@@ -60,7 +60,8 @@ class DriverKubernetesCredentialsFeatureStepSuite extends SparkFunSuite with Bef
Map.empty,
Map.empty,
Map.empty,
- Map.empty)
+ Map.empty,
+ Seq.empty[String])
val kubernetesCredentialsStep = new DriverKubernetesCredentialsFeatureStep(kubernetesConf)
assert(kubernetesCredentialsStep.configurePod(BASE_DRIVER_POD) === BASE_DRIVER_POD)
assert(kubernetesCredentialsStep.getAdditionalPodSystemProperties().isEmpty)
@@ -90,7 +91,8 @@ class DriverKubernetesCredentialsFeatureStepSuite extends SparkFunSuite with Bef
Map.empty,
Map.empty,
Map.empty,
- Map.empty)
+ Map.empty,
+ Seq.empty[String])
val kubernetesCredentialsStep = new DriverKubernetesCredentialsFeatureStep(kubernetesConf)
assert(kubernetesCredentialsStep.configurePod(BASE_DRIVER_POD) === BASE_DRIVER_POD)
@@ -127,7 +129,8 @@ class DriverKubernetesCredentialsFeatureStepSuite extends SparkFunSuite with Bef
Map.empty,
Map.empty,
Map.empty,
- Map.empty)
+ Map.empty,
+ Seq.empty[String])
val kubernetesCredentialsStep = new DriverKubernetesCredentialsFeatureStep(kubernetesConf)
val resolvedProperties = kubernetesCredentialsStep.getAdditionalPodSystemProperties()
val expectedSparkConf = Map(
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala
index f265522a8823a..77d38bf19cd10 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala
@@ -66,7 +66,8 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
Map.empty,
Map.empty,
Map.empty,
- Map.empty))
+ Map.empty,
+ Seq.empty[String]))
assert(configurationStep.configurePod(SparkPod.initialPod()) === SparkPod.initialPod())
assert(configurationStep.getAdditionalKubernetesResources().size === 1)
assert(configurationStep.getAdditionalKubernetesResources().head.isInstanceOf[Service])
@@ -96,7 +97,8 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
Map.empty,
Map.empty,
Map.empty,
- Map.empty))
+ Map.empty,
+ Seq.empty[String]))
val expectedServiceName = SHORT_RESOURCE_NAME_PREFIX +
DriverServiceFeatureStep.DRIVER_SVC_POSTFIX
val expectedHostName = s"$expectedServiceName.my-namespace.svc"
@@ -116,7 +118,8 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
Map.empty,
Map.empty,
Map.empty,
- Map.empty))
+ Map.empty,
+ Seq.empty[String]))
val resolvedService = configurationStep
.getAdditionalKubernetesResources()
.head
@@ -145,7 +148,8 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
Map.empty,
Map.empty,
Map.empty,
- Map.empty),
+ Map.empty,
+ Seq.empty[String]),
clock)
val driverService = configurationStep
.getAdditionalKubernetesResources()
@@ -171,7 +175,8 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
Map.empty,
Map.empty,
Map.empty,
- Map.empty),
+ Map.empty,
+ Seq.empty[String]),
clock)
fail("The driver bind address should not be allowed.")
} catch {
@@ -195,7 +200,8 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
Map.empty,
Map.empty,
Map.empty,
- Map.empty),
+ Map.empty,
+ Seq.empty[String]),
clock)
fail("The driver host address should not be allowed.")
} catch {
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStepSuite.scala
index 8b0b2d0739c76..af6b35eae484a 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStepSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStepSuite.scala
@@ -44,7 +44,8 @@ class EnvSecretsFeatureStepSuite extends SparkFunSuite{
Map.empty,
Map.empty,
envVarsToKeys,
- Map.empty)
+ Map.empty,
+ Seq.empty[String])
val step = new EnvSecretsFeatureStep(kubernetesConf)
val driverContainerWithEnvSecrets = step.configurePod(baseDriverPod).container
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala
index 2542a02d37766..bd6ce4b42fc8e 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala
@@ -44,7 +44,8 @@ class LocalDirsFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
Map.empty,
Map.empty,
Map.empty,
- Map.empty)
+ Map.empty,
+ Seq.empty[String])
}
test("Resolve to default local dir if neither env nor configuration are set") {
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala
index 9155793774123..eff75b8a15daa 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala
@@ -42,7 +42,8 @@ class MountSecretsFeatureStepSuite extends SparkFunSuite {
Map.empty,
secretNamesToMountPaths,
Map.empty,
- Map.empty)
+ Map.empty,
+ Seq.empty[String])
val step = new MountSecretsFeatureStep(kubernetesConf)
val driverPodWithSecretsMounted = step.configurePod(baseDriverPod).pod
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStepSuite.scala
new file mode 100644
index 0000000000000..0f2bf2fa1d9b5
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStepSuite.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features.bindings
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, SparkPod}
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.PythonMainAppResource
+
+class JavaDriverFeatureStepSuite extends SparkFunSuite {
+
+ test("Java Step modifies container correctly") {
+ val baseDriverPod = SparkPod.initialPod()
+ val sparkConf = new SparkConf(false)
+ val kubernetesConf = KubernetesConf(
+ sparkConf,
+ KubernetesDriverSpecificConf(
+ Some(PythonMainAppResource("local:///main.jar")),
+ "test-class",
+ "java-runner",
+ Seq("5 7")),
+ appResourceNamePrefix = "",
+ appId = "",
+ roleLabels = Map.empty,
+ roleAnnotations = Map.empty,
+ roleSecretNamesToMountPaths = Map.empty,
+ roleSecretEnvNamesToKeyRefs = Map.empty,
+ roleEnvs = Map.empty,
+ sparkFiles = Seq.empty[String])
+
+ val step = new JavaDriverFeatureStep(kubernetesConf)
+ val driverPod = step.configurePod(baseDriverPod).pod
+ val driverContainerwithJavaStep = step.configurePod(baseDriverPod).container
+ assert(driverContainerwithJavaStep.getArgs.size === 7)
+ val args = driverContainerwithJavaStep
+ .getArgs.asScala
+ assert(args === List(
+ "driver",
+ "--properties-file", SPARK_CONF_PATH,
+ "--class", "test-class",
+ "spark-internal", "5 7"))
+
+ }
+}
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStepSuite.scala
new file mode 100644
index 0000000000000..a1f9a5d9e264e
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStepSuite.scala
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features.bindings
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, SparkPod}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.PythonMainAppResource
+
+class PythonDriverFeatureStepSuite extends SparkFunSuite {
+
+ test("Python Step modifies container correctly") {
+ val expectedMainResource = "/main.py"
+ val mainResource = "local:///main.py"
+ val pyFiles = Seq("local:///example2.py", "local:///example3.py")
+ val expectedPySparkFiles =
+ "/example2.py:/example3.py"
+ val baseDriverPod = SparkPod.initialPod()
+ val sparkConf = new SparkConf(false)
+ .set(KUBERNETES_PYSPARK_MAIN_APP_RESOURCE, mainResource)
+ .set(KUBERNETES_PYSPARK_PY_FILES, pyFiles.mkString(","))
+ .set("spark.files", "local:///example.py")
+ .set(PYSPARK_MAJOR_PYTHON_VERSION, "2")
+ val kubernetesConf = KubernetesConf(
+ sparkConf,
+ KubernetesDriverSpecificConf(
+ Some(PythonMainAppResource("local:///main.py")),
+ "test-app",
+ "python-runner",
+ Seq("5 7")),
+ appResourceNamePrefix = "",
+ appId = "",
+ roleLabels = Map.empty,
+ roleAnnotations = Map.empty,
+ roleSecretNamesToMountPaths = Map.empty,
+ roleSecretEnvNamesToKeyRefs = Map.empty,
+ roleEnvs = Map.empty,
+ sparkFiles = Seq.empty[String])
+
+ val step = new PythonDriverFeatureStep(kubernetesConf)
+ val driverPod = step.configurePod(baseDriverPod).pod
+ val driverContainerwithPySpark = step.configurePod(baseDriverPod).container
+ assert(driverContainerwithPySpark.getEnv.size === 4)
+ val envs = driverContainerwithPySpark
+ .getEnv
+ .asScala
+ .map(env => (env.getName, env.getValue))
+ .toMap
+ assert(envs(ENV_PYSPARK_PRIMARY) === expectedMainResource)
+ assert(envs(ENV_PYSPARK_FILES) === expectedPySparkFiles)
+ assert(envs(ENV_PYSPARK_ARGS) === "5 7")
+ assert(envs(ENV_PYSPARK_MAJOR_PYTHON_VERSION) === "2")
+ }
+ test("Python Step testing empty pyfiles") {
+ val mainResource = "local:///main.py"
+ val baseDriverPod = SparkPod.initialPod()
+ val sparkConf = new SparkConf(false)
+ .set(KUBERNETES_PYSPARK_MAIN_APP_RESOURCE, mainResource)
+ .set(PYSPARK_MAJOR_PYTHON_VERSION, "3")
+ val kubernetesConf = KubernetesConf(
+ sparkConf,
+ KubernetesDriverSpecificConf(
+ Some(PythonMainAppResource("local:///main.py")),
+ "test-class-py",
+ "python-runner",
+ Seq.empty[String]),
+ appResourceNamePrefix = "",
+ appId = "",
+ roleLabels = Map.empty,
+ roleAnnotations = Map.empty,
+ roleSecretNamesToMountPaths = Map.empty,
+ roleSecretEnvNamesToKeyRefs = Map.empty,
+ roleEnvs = Map.empty,
+ sparkFiles = Seq.empty[String])
+ val step = new PythonDriverFeatureStep(kubernetesConf)
+ val driverContainerwithPySpark = step.configurePod(baseDriverPod).container
+ val args = driverContainerwithPySpark
+ .getArgs.asScala
+ assert(driverContainerwithPySpark.getArgs.size === 5)
+ assert(args === List(
+ "driver-py",
+ "--properties-file", SPARK_CONF_PATH,
+ "--class", "test-class-py"))
+ val envs = driverContainerwithPySpark
+ .getEnv
+ .asScala
+ .map(env => (env.getName, env.getValue))
+ .toMap
+ assert(envs(ENV_PYSPARK_MAJOR_PYTHON_VERSION) === "3")
+ }
+}
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
index 0775338098a13..a8a8218c621ea 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
@@ -143,7 +143,8 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {
Map.empty,
Map.empty,
Map.empty,
- Map.empty)
+ Map.empty,
+ Seq.empty[String])
when(driverBuilder.buildFromFeatures(kubernetesConf)).thenReturn(BUILT_KUBERNETES_SPEC)
when(kubernetesClient.pods()).thenReturn(podOperations)
when(podOperations.withName(POD_NAME)).thenReturn(namedPods)
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala
index cb724068ea4f3..4e8c300543430 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.deploy.k8s.submit
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpec, KubernetesDriverSpecificConf}
import org.apache.spark.deploy.k8s.features.{BasicDriverFeatureStep, DriverKubernetesCredentialsFeatureStep, DriverServiceFeatureStep, EnvSecretsFeatureStep, KubernetesFeaturesTestUtils, LocalDirsFeatureStep, MountSecretsFeatureStep}
+import org.apache.spark.deploy.k8s.features.bindings.{JavaDriverFeatureStep, PythonDriverFeatureStep}
class KubernetesDriverBuilderSuite extends SparkFunSuite {
@@ -27,6 +28,8 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
private val SERVICE_STEP_TYPE = "service"
private val LOCAL_DIRS_STEP_TYPE = "local-dirs"
private val SECRETS_STEP_TYPE = "mount-secrets"
+ private val JAVA_STEP_TYPE = "java-bindings"
+ private val PYSPARK_STEP_TYPE = "pyspark-bindings"
private val ENV_SECRETS_STEP_TYPE = "env-secrets"
private val basicFeatureStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
@@ -44,6 +47,12 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
private val secretsStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
SECRETS_STEP_TYPE, classOf[MountSecretsFeatureStep])
+ private val javaStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
+ JAVA_STEP_TYPE, classOf[JavaDriverFeatureStep])
+
+ private val pythonStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
+ PYSPARK_STEP_TYPE, classOf[PythonDriverFeatureStep])
+
private val envSecretsStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
ENV_SECRETS_STEP_TYPE, classOf[EnvSecretsFeatureStep])
@@ -54,13 +63,15 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
_ => serviceStep,
_ => secretsStep,
_ => envSecretsStep,
- _ => localDirsStep)
+ _ => localDirsStep,
+ _ => javaStep,
+ _ => pythonStep)
test("Apply fundamental steps all the time.") {
val conf = KubernetesConf(
new SparkConf(false),
KubernetesDriverSpecificConf(
- None,
+ Some(JavaMainAppResource("example.jar")),
"test-app",
"main",
Seq.empty),
@@ -70,13 +81,15 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
Map.empty,
Map.empty,
Map.empty,
- Map.empty)
+ Map.empty,
+ Seq.empty[String])
validateStepTypesApplied(
builderUnderTest.buildFromFeatures(conf),
BASIC_STEP_TYPE,
CREDENTIALS_STEP_TYPE,
SERVICE_STEP_TYPE,
- LOCAL_DIRS_STEP_TYPE)
+ LOCAL_DIRS_STEP_TYPE,
+ JAVA_STEP_TYPE)
}
test("Apply secrets step if secrets are present.") {
@@ -93,7 +106,8 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
Map.empty,
Map("secret" -> "secretMountPath"),
Map("EnvName" -> "SecretName:secretKey"),
- Map.empty)
+ Map.empty,
+ Seq.empty[String])
validateStepTypesApplied(
builderUnderTest.buildFromFeatures(conf),
BASIC_STEP_TYPE,
@@ -101,8 +115,58 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
SERVICE_STEP_TYPE,
LOCAL_DIRS_STEP_TYPE,
SECRETS_STEP_TYPE,
- ENV_SECRETS_STEP_TYPE
- )
+ ENV_SECRETS_STEP_TYPE,
+ JAVA_STEP_TYPE)
+ }
+
+ test("Apply Java step if main resource is none.") {
+ val conf = KubernetesConf(
+ new SparkConf(false),
+ KubernetesDriverSpecificConf(
+ None,
+ "test-app",
+ "main",
+ Seq.empty),
+ "prefix",
+ "appId",
+ Map.empty,
+ Map.empty,
+ Map.empty,
+ Map.empty,
+ Map.empty,
+ Seq.empty[String])
+ validateStepTypesApplied(
+ builderUnderTest.buildFromFeatures(conf),
+ BASIC_STEP_TYPE,
+ CREDENTIALS_STEP_TYPE,
+ SERVICE_STEP_TYPE,
+ LOCAL_DIRS_STEP_TYPE,
+ JAVA_STEP_TYPE)
+ }
+
+ test("Apply Python step if main resource is python.") {
+ val conf = KubernetesConf(
+ new SparkConf(false),
+ KubernetesDriverSpecificConf(
+ Some(PythonMainAppResource("example.py")),
+ "test-app",
+ "main",
+ Seq.empty),
+ "prefix",
+ "appId",
+ Map.empty,
+ Map.empty,
+ Map.empty,
+ Map.empty,
+ Map.empty,
+ Seq.empty[String])
+ validateStepTypesApplied(
+ builderUnderTest.buildFromFeatures(conf),
+ BASIC_STEP_TYPE,
+ CREDENTIALS_STEP_TYPE,
+ SERVICE_STEP_TYPE,
+ LOCAL_DIRS_STEP_TYPE,
+ PYSPARK_STEP_TYPE)
}
private def validateStepTypesApplied(resolvedSpec: KubernetesDriverSpec, stepTypes: String*)
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala
index 753cd30a237f3..a6bc8bce32926 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala
@@ -54,7 +54,8 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite {
Map.empty,
Map.empty,
Map.empty,
- Map.empty)
+ Map.empty,
+ Seq.empty[String])
validateStepTypesApplied(
builderUnderTest.buildFromFeatures(conf), BASIC_STEP_TYPE, LOCAL_DIRS_STEP_TYPE)
}
@@ -70,7 +71,8 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite {
Map.empty,
Map("secret" -> "secretMountPath"),
Map("secret-name" -> "secret-key"),
- Map.empty)
+ Map.empty,
+ Seq.empty[String])
validateStepTypesApplied(
builderUnderTest.buildFromFeatures(conf),
BASIC_STEP_TYPE,
diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/bindings/python/Dockerfile b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/bindings/python/Dockerfile
new file mode 100644
index 0000000000000..72bb9620b45de
--- /dev/null
+++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/bindings/python/Dockerfile
@@ -0,0 +1,39 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+ARG base_img
+FROM $base_img
+WORKDIR /
+RUN mkdir ${SPARK_HOME}/python
+COPY python/lib ${SPARK_HOME}/python/lib
+# TODO: Investigate running both pip and pip3 via virtualenvs
+RUN apk add --no-cache python && \
+ apk add --no-cache python3 && \
+ python -m ensurepip && \
+ python3 -m ensurepip && \
+ # We remove ensurepip since it adds no functionality since pip is
+ # installed on the image and it just takes up 1.6MB on the image
+ rm -r /usr/lib/python*/ensurepip && \
+ pip install --upgrade pip setuptools && \
+ # You may install with python3 packages by using pip3.6
+ # Removed the .cache to save space
+ rm -r /root/.cache
+
+ENV PYTHONPATH ${SPARK_HOME}/python/lib/pyspark.zip:${SPARK_HOME}/python/lib/py4j-*.zip
+
+WORKDIR /opt/spark/work-dir
+ENTRYPOINT [ "/opt/entrypoint.sh" ]
diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
index 3e166116aa3fd..acdb4b1f09e0a 100755
--- a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
+++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
@@ -53,6 +53,28 @@ if [ -n "$SPARK_MOUNTED_FILES_DIR" ]; then
cp -R "$SPARK_MOUNTED_FILES_DIR/." .
fi
+if [ -n "$PYSPARK_FILES" ]; then
+ PYTHONPATH="$PYTHONPATH:$PYSPARK_FILES"
+fi
+
+PYSPARK_ARGS=""
+if [ -n "$PYSPARK_APP_ARGS" ]; then
+ PYSPARK_ARGS="$PYSPARK_APP_ARGS"
+fi
+
+
+if [ "$PYSPARK_MAJOR_PYTHON_VERSION" == "2" ]; then
+ pyv="$(python -V 2>&1)"
+ export PYTHON_VERSION="${pyv:7}"
+ export PYSPARK_PYTHON="python"
+ export PYSPARK_DRIVER_PYTHON="python"
+elif [ "$PYSPARK_MAJOR_PYTHON_VERSION" == "3" ]; then
+ pyv3="$(python3 -V 2>&1)"
+ export PYTHON_VERSION="${pyv3:7}"
+ export PYSPARK_PYTHON="python3"
+ export PYSPARK_DRIVER_PYTHON="python3"
+fi
+
case "$SPARK_K8S_CMD" in
driver)
CMD=(
@@ -62,6 +84,14 @@ case "$SPARK_K8S_CMD" in
"$@"
)
;;
+ driver-py)
+ CMD=(
+ "$SPARK_HOME/bin/spark-submit"
+ --conf "spark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS"
+ --deploy-mode client
+ "$@" $PYSPARK_PRIMARY $PYSPARK_ARGS
+ )
+ ;;
executor)
CMD=(