From b1703d9da0433150a89d55abd780a6866617879e Mon Sep 17 00:00:00 2001 From: Ilan Filonenko Date: Wed, 18 Apr 2018 01:17:49 -0400 Subject: [PATCH 1/4] added pyspark testing --- .../k8s/integrationtest/KubernetesSuite.scala | 49 ++++++++++++++++--- .../KubernetesTestComponents.scala | 16 ++++-- 2 files changed, 53 insertions(+), 12 deletions(-) diff --git a/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index 04782d9..bf28fbf 100644 --- a/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -40,6 +40,7 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit private var kubernetesTestComponents: KubernetesTestComponents = _ private var sparkAppConf: SparkAppConf = _ private var image: String = _ + private var pyImage: String = _ private var containerLocalSparkDistroExamplesJar: String = _ private var appLocator: String = _ private var driverPodName: String = _ @@ -62,6 +63,7 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit val imageTag = getTestImageTag val imageRepo = getTestImageRepo image = s"$imageRepo/spark:$imageTag" + pyImage = s"$imageRepo/spark-py:$imageTag" val sparkDistroExamplesJarFile: File = sparkHomeDir.resolve(Paths.get("examples", "jars")) .toFile @@ -184,12 +186,28 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit appArgs = Array(REMOTE_PAGE_RANK_FILE_NAME)) } + test("Run PySpark on simple pi.py example") { + sparkAppConf + .set("spark.kubernetes.container.image", s"${getTestImageRepo}/spark-py:${getTestImageTag}") + runSparkApplicationAndVerifyCompletion( + appResource = PYSPARK_PI, + mainClass = "", + expectedLogOnCompletion = Seq("Pi is roughly 3"), + appArgs = Array("5"), + driverPodChecker = doBasicDriverPyPodCheck, + executorPodChecker = doBasicExecutorPyPodCheck, + appLocator = appLocator, + isJVM = false) + } + + private def runSparkPiAndVerifyCompletion( appResource: String = containerLocalSparkDistroExamplesJar, driverPodChecker: Pod => Unit = doBasicDriverPodCheck, executorPodChecker: Pod => Unit = doBasicExecutorPodCheck, appArgs: Array[String] = Array.empty[String], - appLocator: String = appLocator): Unit = { + appLocator: String = appLocator, + isJVM: Boolean = true ): Unit = { runSparkApplicationAndVerifyCompletion( appResource, SPARK_PI_MAIN_CLASS, @@ -197,7 +215,8 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit appArgs, driverPodChecker, executorPodChecker, - appLocator) + appLocator, + isJVM) } private def runSparkRemoteCheckAndVerifyCompletion( @@ -213,7 +232,8 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit appArgs, driverPodChecker, executorPodChecker, - appLocator) + appLocator, + true) } private def runSparkJVMCheckAndVerifyCompletion( @@ -226,7 +246,7 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit mainAppResource = appResource, mainClass = mainClass, appArgs = appArgs) - SparkAppLauncher.launch(appArguments, sparkAppConf, TIMEOUT.value.toSeconds.toInt, sparkHomeDir) + SparkAppLauncher.launch(appArguments, sparkAppConf, TIMEOUT.value.toSeconds.toInt, sparkHomeDir, true) val driverPod = kubernetesTestComponents.kubernetesClient .pods() @@ -248,6 +268,8 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit } } + + private def runSparkApplicationAndVerifyCompletion( appResource: String, mainClass: String, @@ -255,12 +277,13 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit appArgs: Array[String], driverPodChecker: Pod => Unit, executorPodChecker: Pod => Unit, - appLocator: String): Unit = { + appLocator: String, + isJVM: Boolean): Unit = { val appArguments = SparkAppArguments( mainAppResource = appResource, mainClass = mainClass, appArgs = appArgs) - SparkAppLauncher.launch(appArguments, sparkAppConf, TIMEOUT.value.toSeconds.toInt, sparkHomeDir) + SparkAppLauncher.launch(appArguments, sparkAppConf, TIMEOUT.value.toSeconds.toInt, sparkHomeDir, isJVM) val driverPod = kubernetesTestComponents.kubernetesClient .pods() @@ -298,11 +321,22 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit assert(driverPod.getSpec.getContainers.get(0).getName === "spark-kubernetes-driver") } + private def doBasicDriverPyPodCheck(driverPod: Pod): Unit = { + assert(driverPod.getMetadata.getName === driverPodName) + assert(driverPod.getSpec.getContainers.get(0).getImage === pyImage) + assert(driverPod.getSpec.getContainers.get(0).getName === "spark-kubernetes-driver") + } + private def doBasicExecutorPodCheck(executorPod: Pod): Unit = { assert(executorPod.getSpec.getContainers.get(0).getImage === image) assert(executorPod.getSpec.getContainers.get(0).getName === "executor") } + private def doBasicExecutorPyPodCheck(executorPod: Pod): Unit = { + assert(executorPod.getSpec.getContainers.get(0).getImage === pyImage) + assert(executorPod.getSpec.getContainers.get(0).getName === "executor") + } + private def checkCustomSettings(pod: Pod): Unit = { assert(pod.getMetadata.getLabels.get("label1") === "label1-value") assert(pod.getMetadata.getLabels.get("label2") === "label2-value") @@ -384,7 +418,8 @@ private[spark] object KubernetesSuite { val SPARK_PI_MAIN_CLASS: String = "org.apache.spark.examples.SparkPi" val SPARK_REMOTE_MAIN_CLASS: String = "org.apache.spark.examples.SparkRemoteFileTest" val SPARK_DRIVER_MAIN_CLASS: String = "org.apache.spark.examples.DriverSubmissionTest" - + val CONTAINER_LOCAL_PYSPARK: String = "local:///opt/spark/examples/src/main/python/" + val PYSPARK_PI: String = CONTAINER_LOCAL_PYSPARK + "pi.py" val TEST_SECRET_NAME_PREFIX = "test-secret-" val TEST_SECRET_KEY = "test-key" val TEST_SECRET_VALUE = "test-data" diff --git a/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala b/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala index 0ad29e4..f12f15b 100644 --- a/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala +++ b/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala @@ -99,15 +99,21 @@ private[spark] object SparkAppLauncher extends Logging { appArguments: SparkAppArguments, appConf: SparkAppConf, timeoutSecs: Int, - sparkHomeDir: Path): Unit = { + sparkHomeDir: Path, + isJVM: Boolean): Unit = { val sparkSubmitExecutable = sparkHomeDir.resolve(Paths.get("bin", "spark-submit")) logInfo(s"Launching a spark app with arguments $appArguments and conf $appConf") - val commandLine = mutable.ArrayBuffer(sparkSubmitExecutable.toFile.getAbsolutePath, + val preCommandLine = if (isJVM) { + mutable.ArrayBuffer(sparkSubmitExecutable.toFile.getAbsolutePath, "--deploy-mode", "cluster", "--class", appArguments.mainClass, - "--master", appConf.get("spark.master") - ) ++ appConf.toStringArray :+ - appArguments.mainAppResource + "--master", appConf.get("spark.master")) + } else { + mutable.ArrayBuffer(sparkSubmitExecutable.toFile.getAbsolutePath, + "--deploy-mode", "cluster", + "--master", appConf.get("spark.master")) + } + val commandLine = preCommandLine ++ appConf.toStringArray :+ appArguments.mainAppResource if (appArguments.appArgs.nonEmpty) { commandLine += appArguments.appArgs.mkString(" ") } From e06454f46f1bfa398293d884d500c7b12f2cde7f Mon Sep 17 00:00:00 2001 From: Ilan Filonenko Date: Thu, 7 Jun 2018 18:59:01 -0400 Subject: [PATCH 2/4] addition of --py-files test --- .../k8s/integrationtest/KubernetesSuite.scala | 48 ++++++++++++++++++- .../KubernetesTestComponents.scala | 8 +++- 2 files changed, 52 insertions(+), 4 deletions(-) diff --git a/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index bf28fbf..505e0d0 100644 --- a/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -200,6 +200,41 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit isJVM = false) } + test("Run PySpark with Python2 to test a pyfiles example") { + sparkAppConf + .set("spark.kubernetes.container.image", s"${getTestImageRepo}/spark-py:${getTestImageTag}") + .set("spark.kubernetes.pyspark.pythonversion", "2") + runSparkApplicationAndVerifyCompletion( + appResource = PYSPARK_FILES, + mainClass = "", + expectedLogOnCompletion = Seq( + "Python runtime version check is: True", + "Python environment version check is: True"), + appArgs = Array("python"), + driverPodChecker = doBasicDriverPyPodCheck, + executorPodChecker = doBasicExecutorPyPodCheck, + appLocator = appLocator, + isJVM = false, + pyFiles = Some(PYSPARK_CONTAINER_TESTS)) + } + + test("Run PySpark with Python3 to test a pyfiles example") { + sparkAppConf + .set("spark.kubernetes.container.image", s"${getTestImageRepo}/spark-py:${getTestImageTag}") + .set("spark.kubernetes.pyspark.pythonversion", "3") + runSparkApplicationAndVerifyCompletion( + appResource = PYSPARK_FILES, + mainClass = "", + expectedLogOnCompletion = Seq( + "Python runtime version check is: True", + "Python environment version check is: True"), + appArgs = Array("python3"), + driverPodChecker = doBasicDriverPyPodCheck, + executorPodChecker = doBasicExecutorPyPodCheck, + appLocator = appLocator, + isJVM = false, + pyFiles = Some(PYSPARK_CONTAINER_TESTS)) + } private def runSparkPiAndVerifyCompletion( appResource: String = containerLocalSparkDistroExamplesJar, @@ -278,12 +313,19 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit driverPodChecker: Pod => Unit, executorPodChecker: Pod => Unit, appLocator: String, - isJVM: Boolean): Unit = { + isJVM: Boolean, + pyFiles: Option[String] = None): Unit = { val appArguments = SparkAppArguments( mainAppResource = appResource, mainClass = mainClass, appArgs = appArgs) - SparkAppLauncher.launch(appArguments, sparkAppConf, TIMEOUT.value.toSeconds.toInt, sparkHomeDir, isJVM) + SparkAppLauncher.launch( + appArguments, + sparkAppConf, + TIMEOUT.value.toSeconds.toInt, + sparkHomeDir, + isJVM, + pyFiles) val driverPod = kubernetesTestComponents.kubernetesClient .pods() @@ -420,6 +462,8 @@ private[spark] object KubernetesSuite { val SPARK_DRIVER_MAIN_CLASS: String = "org.apache.spark.examples.DriverSubmissionTest" val CONTAINER_LOCAL_PYSPARK: String = "local:///opt/spark/examples/src/main/python/" val PYSPARK_PI: String = CONTAINER_LOCAL_PYSPARK + "pi.py" + val PYSPARK_FILES: String = CONTAINER_LOCAL_PYSPARK + "pyfiles.py" + val PYSPARK_CONTAINER_TESTS: String = CONTAINER_LOCAL_PYSPARK + "py_container_checks.py" val TEST_SECRET_NAME_PREFIX = "test-secret-" val TEST_SECRET_KEY = "test-key" val TEST_SECRET_VALUE = "test-data" diff --git a/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala b/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala index f12f15b..d019e03 100644 --- a/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala +++ b/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala @@ -100,7 +100,8 @@ private[spark] object SparkAppLauncher extends Logging { appConf: SparkAppConf, timeoutSecs: Int, sparkHomeDir: Path, - isJVM: Boolean): Unit = { + isJVM: Boolean, + pyFiles: Option[String] = None): Unit = { val sparkSubmitExecutable = sparkHomeDir.resolve(Paths.get("bin", "spark-submit")) logInfo(s"Launching a spark app with arguments $appArguments and conf $appConf") val preCommandLine = if (isJVM) { @@ -113,7 +114,10 @@ private[spark] object SparkAppLauncher extends Logging { "--deploy-mode", "cluster", "--master", appConf.get("spark.master")) } - val commandLine = preCommandLine ++ appConf.toStringArray :+ appArguments.mainAppResource + val commandLine = + pyFiles.map(s => preCommandLine ++ Array("--py-files", s)).getOrElse(preCommandLine) ++ + appConf.toStringArray :+ appArguments.mainAppResource + if (appArguments.appArgs.nonEmpty) { commandLine += appArguments.appArgs.mkString(" ") } From 7d07394cd6331a84992e9c9454c34b1878dff0e0 Mon Sep 17 00:00:00 2001 From: Ilan Filonenko Date: Fri, 8 Jun 2018 14:33:04 -0400 Subject: [PATCH 3/4] small style --- .../spark/deploy/k8s/integrationtest/KubernetesSuite.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index 505e0d0..e1af229 100644 --- a/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -303,8 +303,6 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit } } - - private def runSparkApplicationAndVerifyCompletion( appResource: String, mainClass: String, From 4f4cb461a78abe1e36e775b14fd754f189e4f83b Mon Sep 17 00:00:00 2001 From: Ilan Filonenko Date: Fri, 8 Jun 2018 15:14:32 -0400 Subject: [PATCH 4/4] small style change to kick off test --- .../spark/deploy/k8s/integrationtest/KubernetesSuite.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index e1af229..4bdeca8 100644 --- a/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -462,6 +462,7 @@ private[spark] object KubernetesSuite { val PYSPARK_PI: String = CONTAINER_LOCAL_PYSPARK + "pi.py" val PYSPARK_FILES: String = CONTAINER_LOCAL_PYSPARK + "pyfiles.py" val PYSPARK_CONTAINER_TESTS: String = CONTAINER_LOCAL_PYSPARK + "py_container_checks.py" + val TEST_SECRET_NAME_PREFIX = "test-secret-" val TEST_SECRET_KEY = "test-key" val TEST_SECRET_VALUE = "test-data"