Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dev/make-distribution.sh
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ fi
if [ -d "$SPARK_HOME"/resource-managers/kubernetes/core/target/ ]; then
mkdir -p "$DISTDIR/kubernetes/"
cp -a "$SPARK_HOME"/resource-managers/kubernetes/docker/src/main/dockerfiles "$DISTDIR/kubernetes/"
cp -a "$SPARK_HOME"/resource-managers/kubernetes/integration-tests/tests "$DISTDIR/kubernetes/"
fi

# Copy examples and dependencies
Expand Down
2 changes: 1 addition & 1 deletion docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ of the most common options to set are:
unless otherwise specified. If set, PySpark memory for an executor will be
limited to this amount. If not set, Spark will not limit Python's memory use
and it is up to the application to avoid exceeding the overhead memory space
shared with other non-JVM processes. When PySpark is run in YARN, this memory
shared with other non-JVM processes. When PySpark is run in YARN or Kubernetes, this memory
is added to executor resource requests.
</td>
</tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,13 @@ private[spark] object Config extends Logging {
"Ensure that major Python version is either Python2 or Python3")
.createWithDefault("2")

val APP_RESOURCE_TYPE =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this instead of the bools? What about folks who want to make a pipeline which is both R and Python?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason for this is because we are already running binding steps that configure the driver based on the app resource. I thought it might as well pass the config down into the executors upon doing that binding bootstrap step.

Currently, we don't have any docker files that handle mixed pipelines so such configurations should be addressed in a followup-PR, imo. But I am open to suggestions (that are testable).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah lets do something in a follow up after 2.4

ConfigBuilder("spark.kubernetes.resource.type")
.doc("This sets the resource type internally")
.internal()
.stringConf
.createOptional

val KUBERNETES_LOCAL_DIRS_TMPFS =
ConfigBuilder("spark.kubernetes.local.dirs.tmpfs")
.doc("If set to true then emptyDir volumes created to back SPARK_LOCAL_DIRS will have " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.SparkException
import org.apache.spark.deploy.k8s._
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.internal.config.{EXECUTOR_CLASS_PATH, EXECUTOR_JAVA_OPTIONS, EXECUTOR_MEMORY, EXECUTOR_MEMORY_OVERHEAD}
import org.apache.spark.internal.config.{EXECUTOR_CLASS_PATH, EXECUTOR_JAVA_OPTIONS, EXECUTOR_MEMORY, EXECUTOR_MEMORY_OVERHEAD, PYSPARK_EXECUTOR_MEMORY}
import org.apache.spark.rpc.RpcEndpointAddress
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -58,6 +58,16 @@ private[spark] class BasicExecutorFeatureStep(
(kubernetesConf.get(MEMORY_OVERHEAD_FACTOR) * executorMemoryMiB).toInt,
MEMORY_OVERHEAD_MIN_MIB))
private val executorMemoryWithOverhead = executorMemoryMiB + memoryOverheadMiB
private val executorMemoryTotal = kubernetesConf.sparkConf
.getOption(APP_RESOURCE_TYPE.key).map{ res =>
val additionalPySparkMemory = res match {
case "python" =>
kubernetesConf.sparkConf
.get(PYSPARK_EXECUTOR_MEMORY).map(_.toInt).getOrElse(0)
case _ => 0
}
executorMemoryWithOverhead + additionalPySparkMemory
}.getOrElse(executorMemoryWithOverhead)

private val executorCores = kubernetesConf.sparkConf.getInt("spark.executor.cores", 1)
private val executorCoresRequest =
Expand All @@ -76,7 +86,7 @@ private[spark] class BasicExecutorFeatureStep(
// executorId
val hostname = name.substring(Math.max(0, name.length - 63))
val executorMemoryQuantity = new QuantityBuilder(false)
.withAmount(s"${executorMemoryWithOverhead}Mi")
.withAmount(s"${executorMemoryTotal}Mi")
.build()
val executorCpuQuantity = new QuantityBuilder(false)
.withAmount(executorCoresRequest)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.deploy.k8s.features.bindings
import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata}

import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, SparkPod}
import org.apache.spark.deploy.k8s.Config.APP_RESOURCE_TYPE
import org.apache.spark.deploy.k8s.Constants.SPARK_CONF_PATH
import org.apache.spark.deploy.k8s.features.KubernetesFeatureConfigStep
import org.apache.spark.launcher.SparkLauncher
Expand All @@ -38,7 +39,8 @@ private[spark] class JavaDriverFeatureStep(
.build()
SparkPod(pod.pod, withDriverArgs)
}
override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty
override def getAdditionalPodSystemProperties(): Map[String, String] =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So is this going to override it if the user wants to set a different type even if they are ostensibly running a JVM pod (e.g. mixed language pipelines).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, not sure what our direction is going to be with respect to mixed pipelines throughout the cluster manager sections - if we should be supporting it in a first class way then perhaps we file a JIRA and we can discuss how the submission client should be refactored to support that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

filed SPARK-25373 - Support mixed language pipelines on Spark on K8s

Map(APP_RESOURCE_TYPE.key -> "java")

override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import scala.collection.JavaConverters._
import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, HasMetadata}

import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, KubernetesUtils, SparkPod}
import org.apache.spark.deploy.k8s.Config.APP_RESOURCE_TYPE
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.features.KubernetesFeatureConfigStep

Expand Down Expand Up @@ -68,7 +69,8 @@ private[spark] class PythonDriverFeatureStep(

SparkPod(pod.pod, withPythonPrimaryContainer)
}
override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty
override def getAdditionalPodSystemProperties(): Map[String, String] =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above (just curious if this overrides user params or precedence rules here). Also add a comment of the precedence rules.

Map(APP_RESOURCE_TYPE.key -> "python")

override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import scala.collection.JavaConverters._
import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, HasMetadata}

import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, KubernetesUtils, SparkPod}
import org.apache.spark.deploy.k8s.Config.APP_RESOURCE_TYPE
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.features.KubernetesFeatureConfigStep

Expand Down Expand Up @@ -54,7 +55,8 @@ private[spark] class RDriverFeatureStep(

SparkPod(pod.pod, withRPrimaryContainer)
}
override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty
override def getAdditionalPodSystemProperties(): Map[String, String] =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same

Map(APP_RESOURCE_TYPE.key -> "r")

override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
MAIN_CLASS,
APP_ARGS)


test("Check the pod respects all configurations from the user.") {
val sparkConf = new SparkConf()
.set(KUBERNETES_DRIVER_POD_NAME, "spark-driver-pod")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ class BasicExecutorFeatureStepSuite
.set("spark.driver.host", DRIVER_HOSTNAME)
.set("spark.driver.port", DRIVER_PORT.toString)
.set(IMAGE_PULL_SECRETS, TEST_IMAGE_PULL_SECRETS.mkString(","))
.set("spark.kubernetes.resource.type", "java")
}

test("basic executor pod has reasonable defaults") {
Expand Down Expand Up @@ -161,6 +162,29 @@ class BasicExecutorFeatureStepSuite
checkOwnerReferences(executor.pod, DRIVER_POD_UID)
}

test("test executor pyspark memory") {
val conf = baseConf.clone()
conf.set("spark.kubernetes.resource.type", "python")
conf.set(org.apache.spark.internal.config.PYSPARK_EXECUTOR_MEMORY, 42L)

val step = new BasicExecutorFeatureStep(
KubernetesConf(
conf,
KubernetesExecutorSpecificConf("1", Some(DRIVER_POD)),
RESOURCE_NAME_PREFIX,
APP_ID,
LABELS,
ANNOTATIONS,
Map.empty,
Map.empty,
Map.empty,
Nil,
Seq.empty[String]))
val executor = step.configurePod(SparkPod.initialPod())
// This is checking that basic executor + executorMemory = 1408 + 42 = 1450
assert(executor.container.getResources.getRequests.get("memory").getAmount === "1450Mi")
}

// There is always exactly one controller reference, and it points to the driver pod.
private def checkOwnerReferences(executor: Pod, driverPodUid: String): Unit = {
assert(executor.getMetadata.getOwnerReferences.size() === 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,5 @@ class JavaDriverFeatureStepSuite extends SparkFunSuite {
"--properties-file", SPARK_CONF_PATH,
"--class", "test-class",
"spark-internal", "5 7"))

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ COPY bin /opt/spark/bin
COPY sbin /opt/spark/sbin
COPY ${img_path}/spark/entrypoint.sh /opt/
COPY examples /opt/spark/examples
COPY kubernetes/tests /opt/spark/tests
COPY data /opt/spark/data

ENV SPARK_HOME /opt/spark
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ ARG base_img
FROM $base_img
WORKDIR /
RUN mkdir ${SPARK_HOME}/R
COPY R ${SPARK_HOME}/R

RUN apk add --no-cache R R-dev

COPY R ${SPARK_HOME}/R
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this change intentional?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes the docker build not run the R and R-dev installations each time an update to the jar is made. This is a minor change that helps with dev :)

ENV R_HOME /usr/lib/R

WORKDIR /opt/spark/work-dir
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ ARG base_img
FROM $base_img
WORKDIR /
RUN mkdir ${SPARK_HOME}/python
COPY python/lib ${SPARK_HOME}/python/lib
# TODO: Investigate running both pip and pip3 via virtualenvs
RUN apk add --no-cache python && \
apk add --no-cache python3 && \
Expand All @@ -33,6 +32,7 @@ RUN apk add --no-cache python && \
# Removed the .cache to save space
rm -r /root/.cache

COPY python/lib ${SPARK_HOME}/python/lib
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same, is this change intentional

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as the R change above ^^

ENV PYTHONPATH ${SPARK_HOME}/python/lib/pyspark.zip:${SPARK_HOME}/python/lib/py4j-*.zip

WORKDIR /opt/spark/work-dir
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,17 @@ private[spark] class KubernetesSuite extends SparkFunSuite
protected var containerLocalSparkDistroExamplesJar: String = _
protected var appLocator: String = _

// Default memory limit is 1024M + 384M (minimum overhead constant)
private val baseMemory = s"${1024 + 384}Mi"
protected val memOverheadConstant = 0.8
private val standardNonJVMMemory = s"${(1024 + 0.4*1024).toInt}Mi"
protected val additionalMemory = 200
// 209715200 is 200Mi
protected val additionalMemoryInBytes = 209715200
private val extraDriverTotalMemory = s"${(1024 + memOverheadConstant*1024).toInt}Mi"
private val extraExecTotalMemory =
s"${(1024 + memOverheadConstant*1024 + additionalMemory).toInt}Mi"

override def beforeAll(): Unit = {
// The scalatest-maven-plugin gives system properties that are referenced but not set null
// values. We need to remove the null-value properties before initializing the test backend.
Expand Down Expand Up @@ -233,35 +244,57 @@ private[spark] class KubernetesSuite extends SparkFunSuite
assert(driverPod.getMetadata.getName === driverPodName)
assert(driverPod.getSpec.getContainers.get(0).getImage === image)
assert(driverPod.getSpec.getContainers.get(0).getName === "spark-kubernetes-driver")
assert(driverPod.getSpec.getContainers.get(0).getResources.getRequests.get("memory").getAmount
=== baseMemory)
}


protected def doBasicDriverPyPodCheck(driverPod: Pod): Unit = {
assert(driverPod.getMetadata.getName === driverPodName)
assert(driverPod.getSpec.getContainers.get(0).getImage === pyImage)
assert(driverPod.getSpec.getContainers.get(0).getName === "spark-kubernetes-driver")
assert(driverPod.getSpec.getContainers.get(0).getResources.getRequests.get("memory").getAmount
=== standardNonJVMMemory)
}

protected def doBasicDriverRPodCheck(driverPod: Pod): Unit = {
assert(driverPod.getMetadata.getName === driverPodName)
assert(driverPod.getSpec.getContainers.get(0).getImage === rImage)
assert(driverPod.getSpec.getContainers.get(0).getName === "spark-kubernetes-driver")
assert(driverPod.getSpec.getContainers.get(0).getResources.getRequests.get("memory").getAmount
=== standardNonJVMMemory)
}


protected def doBasicExecutorPodCheck(executorPod: Pod): Unit = {
assert(executorPod.getSpec.getContainers.get(0).getImage === image)
assert(executorPod.getSpec.getContainers.get(0).getName === "executor")
assert(executorPod.getSpec.getContainers.get(0).getResources.getRequests.get("memory").getAmount
=== baseMemory)
}

protected def doBasicExecutorPyPodCheck(executorPod: Pod): Unit = {
assert(executorPod.getSpec.getContainers.get(0).getImage === pyImage)
assert(executorPod.getSpec.getContainers.get(0).getName === "executor")
assert(executorPod.getSpec.getContainers.get(0).getResources.getRequests.get("memory").getAmount
=== standardNonJVMMemory)
}

protected def doBasicExecutorRPodCheck(executorPod: Pod): Unit = {
assert(executorPod.getSpec.getContainers.get(0).getImage === rImage)
assert(executorPod.getSpec.getContainers.get(0).getName === "executor")
assert(executorPod.getSpec.getContainers.get(0).getResources.getRequests.get("memory").getAmount
=== standardNonJVMMemory)
}

protected def doDriverMemoryCheck(driverPod: Pod): Unit = {
assert(driverPod.getSpec.getContainers.get(0).getResources.getRequests.get("memory").getAmount
=== extraDriverTotalMemory)
}

protected def doExecutorMemoryCheck(executorPod: Pod): Unit = {
assert(executorPod.getSpec.getContainers.get(0).getResources.getRequests.get("memory").getAmount
=== extraExecTotalMemory)
}

protected def checkCustomSettings(pod: Pod): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ private[spark] trait PythonTestsSuite { k8sSuite: KubernetesSuite =>
import PythonTestsSuite._
import KubernetesSuite.k8sTestTag

private val pySparkDockerImage =
s"${getTestImageRepo}/spark-py:${getTestImageTag}"
test("Run PySpark on simple pi.py example", k8sTestTag) {
sparkAppConf
.set("spark.kubernetes.container.image", s"${getTestImageRepo}/spark-py:${getTestImageTag}")
.set("spark.kubernetes.container.image", pySparkDockerImage)
runSparkApplicationAndVerifyCompletion(
appResource = PYSPARK_PI,
mainClass = "",
Expand All @@ -39,7 +41,7 @@ private[spark] trait PythonTestsSuite { k8sSuite: KubernetesSuite =>

test("Run PySpark with Python2 to test a pyfiles example", k8sTestTag) {
sparkAppConf
.set("spark.kubernetes.container.image", s"${getTestImageRepo}/spark-py:${getTestImageTag}")
.set("spark.kubernetes.container.image", pySparkDockerImage)
.set("spark.kubernetes.pyspark.pythonVersion", "2")
runSparkApplicationAndVerifyCompletion(
appResource = PYSPARK_FILES,
Expand All @@ -57,7 +59,7 @@ private[spark] trait PythonTestsSuite { k8sSuite: KubernetesSuite =>

test("Run PySpark with Python3 to test a pyfiles example", k8sTestTag) {
sparkAppConf
.set("spark.kubernetes.container.image", s"${getTestImageRepo}/spark-py:${getTestImageTag}")
.set("spark.kubernetes.container.image", pySparkDockerImage)
.set("spark.kubernetes.pyspark.pythonVersion", "3")
runSparkApplicationAndVerifyCompletion(
appResource = PYSPARK_FILES,
Expand All @@ -72,12 +74,32 @@ private[spark] trait PythonTestsSuite { k8sSuite: KubernetesSuite =>
isJVM = false,
pyFiles = Some(PYSPARK_CONTAINER_TESTS))
}

test("Run PySpark with memory customization", k8sTestTag) {
sparkAppConf
.set("spark.kubernetes.container.image", pySparkDockerImage)
.set("spark.kubernetes.pyspark.pythonVersion", "3")
.set("spark.kubernetes.memoryOverheadFactor", s"$memOverheadConstant")
.set("spark.executor.pyspark.memory", s"${additionalMemory}m")
runSparkApplicationAndVerifyCompletion(
appResource = PYSPARK_MEMORY_CHECK,
mainClass = "",
expectedLogOnCompletion = Seq(
"PySpark Worker Memory Check is: True"),
appArgs = Array(s"$additionalMemoryInBytes"),
driverPodChecker = doDriverMemoryCheck,
executorPodChecker = doExecutorMemoryCheck,
appLocator = appLocator,
isJVM = false,
pyFiles = Some(PYSPARK_CONTAINER_TESTS))
}
}

private[spark] object PythonTestsSuite {
val CONTAINER_LOCAL_PYSPARK: String = "local:///opt/spark/examples/src/main/python/"
val PYSPARK_PI: String = CONTAINER_LOCAL_PYSPARK + "pi.py"
val PYSPARK_FILES: String = CONTAINER_LOCAL_PYSPARK + "pyfiles.py"
val PYSPARK_CONTAINER_TESTS: String = CONTAINER_LOCAL_PYSPARK + "py_container_checks.py"
val TEST_LOCAL_PYSPARK: String = "local:///opt/spark/tests/"
val PYSPARK_FILES: String = TEST_LOCAL_PYSPARK + "pyfiles.py"
val PYSPARK_CONTAINER_TESTS: String = TEST_LOCAL_PYSPARK + "py_container_checks.py"
val PYSPARK_MEMORY_CHECK: String = TEST_LOCAL_PYSPARK + "worker_memory_check.py"
}

Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ private[spark] trait SecretsTestsSuite { k8sSuite: KubernetesSuite =>
.delete()
}

// TODO: [SPARK-25291] This test is flaky with regards to memory of executors
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mccheah This test periodically fails on setting proper memory for executors on this specific test. I have filed a JIRA: SPARK-25291

test("Run SparkPi with env and mount secrets.", k8sTestTag) {
createTestSecret()
sparkAppConf
Expand Down
Loading