Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit
private var kubernetesTestComponents: KubernetesTestComponents = _
private var sparkAppConf: SparkAppConf = _
private var image: String = _
private var pyImage: String = _
private var containerLocalSparkDistroExamplesJar: String = _
private var appLocator: String = _
private var driverPodName: String = _
Expand All @@ -62,6 +63,7 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit
val imageTag = getTestImageTag
val imageRepo = getTestImageRepo
image = s"$imageRepo/spark:$imageTag"
pyImage = s"$imageRepo/spark-py:$imageTag"

val sparkDistroExamplesJarFile: File = sparkHomeDir.resolve(Paths.get("examples", "jars"))
.toFile
Expand Down Expand Up @@ -184,20 +186,72 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit
appArgs = Array(REMOTE_PAGE_RANK_FILE_NAME))
}

test("Run PySpark on simple pi.py example") {
sparkAppConf
.set("spark.kubernetes.container.image", s"${getTestImageRepo}/spark-py:${getTestImageTag}")
runSparkApplicationAndVerifyCompletion(
appResource = PYSPARK_PI,
mainClass = "",
expectedLogOnCompletion = Seq("Pi is roughly 3"),
appArgs = Array("5"),
driverPodChecker = doBasicDriverPyPodCheck,
executorPodChecker = doBasicExecutorPyPodCheck,
appLocator = appLocator,
isJVM = false)
}

test("Run PySpark with Python2 to test a pyfiles example") {
sparkAppConf
.set("spark.kubernetes.container.image", s"${getTestImageRepo}/spark-py:${getTestImageTag}")
.set("spark.kubernetes.pyspark.pythonversion", "2")
runSparkApplicationAndVerifyCompletion(
appResource = PYSPARK_FILES,
mainClass = "",
expectedLogOnCompletion = Seq(
"Python runtime version check is: True",
"Python environment version check is: True"),
appArgs = Array("python"),
driverPodChecker = doBasicDriverPyPodCheck,
executorPodChecker = doBasicExecutorPyPodCheck,
appLocator = appLocator,
isJVM = false,
pyFiles = Some(PYSPARK_CONTAINER_TESTS))
}

test("Run PySpark with Python3 to test a pyfiles example") {
sparkAppConf
.set("spark.kubernetes.container.image", s"${getTestImageRepo}/spark-py:${getTestImageTag}")
.set("spark.kubernetes.pyspark.pythonversion", "3")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: looks like pythonVersion is the right one based on the style of existing Spark config properties.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hehe... well... I will make sure to change that style in a follow-up PR to core.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good.

runSparkApplicationAndVerifyCompletion(
appResource = PYSPARK_FILES,
mainClass = "",
expectedLogOnCompletion = Seq(
"Python runtime version check is: True",
"Python environment version check is: True"),
appArgs = Array("python3"),
driverPodChecker = doBasicDriverPyPodCheck,
executorPodChecker = doBasicExecutorPyPodCheck,
appLocator = appLocator,
isJVM = false,
pyFiles = Some(PYSPARK_CONTAINER_TESTS))
}

private def runSparkPiAndVerifyCompletion(
appResource: String = containerLocalSparkDistroExamplesJar,
driverPodChecker: Pod => Unit = doBasicDriverPodCheck,
executorPodChecker: Pod => Unit = doBasicExecutorPodCheck,
appArgs: Array[String] = Array.empty[String],
appLocator: String = appLocator): Unit = {
appLocator: String = appLocator,
isJVM: Boolean = true ): Unit = {
runSparkApplicationAndVerifyCompletion(
appResource,
SPARK_PI_MAIN_CLASS,
Seq("Pi is roughly 3"),
appArgs,
driverPodChecker,
executorPodChecker,
appLocator)
appLocator,
isJVM)
}

private def runSparkRemoteCheckAndVerifyCompletion(
Expand All @@ -213,7 +267,8 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit
appArgs,
driverPodChecker,
executorPodChecker,
appLocator)
appLocator,
true)
}

private def runSparkJVMCheckAndVerifyCompletion(
Expand All @@ -226,7 +281,7 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit
mainAppResource = appResource,
mainClass = mainClass,
appArgs = appArgs)
SparkAppLauncher.launch(appArguments, sparkAppConf, TIMEOUT.value.toSeconds.toInt, sparkHomeDir)
SparkAppLauncher.launch(appArguments, sparkAppConf, TIMEOUT.value.toSeconds.toInt, sparkHomeDir, true)

val driverPod = kubernetesTestComponents.kubernetesClient
.pods()
Expand Down Expand Up @@ -255,12 +310,20 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit
appArgs: Array[String],
driverPodChecker: Pod => Unit,
executorPodChecker: Pod => Unit,
appLocator: String): Unit = {
appLocator: String,
isJVM: Boolean,
pyFiles: Option[String] = None): Unit = {
val appArguments = SparkAppArguments(
mainAppResource = appResource,
mainClass = mainClass,
appArgs = appArgs)
SparkAppLauncher.launch(appArguments, sparkAppConf, TIMEOUT.value.toSeconds.toInt, sparkHomeDir)
SparkAppLauncher.launch(
appArguments,
sparkAppConf,
TIMEOUT.value.toSeconds.toInt,
sparkHomeDir,
isJVM,
pyFiles)

val driverPod = kubernetesTestComponents.kubernetesClient
.pods()
Expand Down Expand Up @@ -298,11 +361,22 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit
assert(driverPod.getSpec.getContainers.get(0).getName === "spark-kubernetes-driver")
}

private def doBasicDriverPyPodCheck(driverPod: Pod): Unit = {
assert(driverPod.getMetadata.getName === driverPodName)
assert(driverPod.getSpec.getContainers.get(0).getImage === pyImage)
assert(driverPod.getSpec.getContainers.get(0).getName === "spark-kubernetes-driver")
}

private def doBasicExecutorPodCheck(executorPod: Pod): Unit = {
assert(executorPod.getSpec.getContainers.get(0).getImage === image)
assert(executorPod.getSpec.getContainers.get(0).getName === "executor")
}

private def doBasicExecutorPyPodCheck(executorPod: Pod): Unit = {
assert(executorPod.getSpec.getContainers.get(0).getImage === pyImage)
assert(executorPod.getSpec.getContainers.get(0).getName === "executor")
}

private def checkCustomSettings(pod: Pod): Unit = {
assert(pod.getMetadata.getLabels.get("label1") === "label1-value")
assert(pod.getMetadata.getLabels.get("label2") === "label2-value")
Expand Down Expand Up @@ -384,7 +458,11 @@ private[spark] object KubernetesSuite {
val SPARK_PI_MAIN_CLASS: String = "org.apache.spark.examples.SparkPi"
val SPARK_REMOTE_MAIN_CLASS: String = "org.apache.spark.examples.SparkRemoteFileTest"
val SPARK_DRIVER_MAIN_CLASS: String = "org.apache.spark.examples.DriverSubmissionTest"

val CONTAINER_LOCAL_PYSPARK: String = "local:///opt/spark/examples/src/main/python/"
val PYSPARK_PI: String = CONTAINER_LOCAL_PYSPARK + "pi.py"
val PYSPARK_FILES: String = CONTAINER_LOCAL_PYSPARK + "pyfiles.py"
val PYSPARK_CONTAINER_TESTS: String = CONTAINER_LOCAL_PYSPARK + "py_container_checks.py"

val TEST_SECRET_NAME_PREFIX = "test-secret-"
val TEST_SECRET_KEY = "test-key"
val TEST_SECRET_VALUE = "test-data"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,25 @@ private[spark] object SparkAppLauncher extends Logging {
appArguments: SparkAppArguments,
appConf: SparkAppConf,
timeoutSecs: Int,
sparkHomeDir: Path): Unit = {
sparkHomeDir: Path,
isJVM: Boolean,
pyFiles: Option[String] = None): Unit = {
val sparkSubmitExecutable = sparkHomeDir.resolve(Paths.get("bin", "spark-submit"))
logInfo(s"Launching a spark app with arguments $appArguments and conf $appConf")
val commandLine = mutable.ArrayBuffer(sparkSubmitExecutable.toFile.getAbsolutePath,
val preCommandLine = if (isJVM) {
mutable.ArrayBuffer(sparkSubmitExecutable.toFile.getAbsolutePath,
"--deploy-mode", "cluster",
"--class", appArguments.mainClass,
"--master", appConf.get("spark.master")
) ++ appConf.toStringArray :+
appArguments.mainAppResource
"--master", appConf.get("spark.master"))
} else {
mutable.ArrayBuffer(sparkSubmitExecutable.toFile.getAbsolutePath,
"--deploy-mode", "cluster",
"--master", appConf.get("spark.master"))
}
val commandLine =
pyFiles.map(s => preCommandLine ++ Array("--py-files", s)).getOrElse(preCommandLine) ++
appConf.toStringArray :+ appArguments.mainAppResource

if (appArguments.appArgs.nonEmpty) {
commandLine += appArguments.appArgs.mkString(" ")
}
Expand Down