From 5c60e09f422430b677ae7139287bda5d081eec81 Mon Sep 17 00:00:00 2001 From: Ilan Filonenko Date: Fri, 30 Jun 2017 16:05:54 -0700 Subject: [PATCH] Small edits to docs, tests, and python specifics --- .../deploy/kubernetes/submit/Client.scala | 16 ++++++++++ .../submit/submitsteps/PythonStep.scala | 2 +- .../integrationtest/KubernetesSuite.scala | 31 ++++++++++++------- .../minikube/MinikubeTestBackend.scala | 2 +- 4 files changed, 37 insertions(+), 14 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala index d6a765235b53..e4eb353796b1 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala @@ -78,6 +78,13 @@ private[spark] class Client( private val driverJavaOptions = submissionSparkConf.get( org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS) + /** + * Run command that initalizes a DriverSpec that will be updated + * after each KubernetesSubmissionStep in the sequence that is passed in. + * The final driver-spec will be used to build the Driver Container, + * Driver Pod, and Kubernetes Resources + * + */ def run(): Unit = { // Set new metadata and a new spec so that submission steps can use PodBuilder#editMetadata() // and/or PodBuilder#editSpec() safely. @@ -87,6 +94,8 @@ private[spark] class Client( driverContainer = new ContainerBuilder().build(), driverSparkConf = submissionSparkConf.clone(), otherKubernetesResources = Seq.empty[HasMetadata]) + // This orchestrator determines which steps are necessary to take to resolve varying + // client arguments that are passed in for (nextStep <- submissionSteps) { currentDriverSpec = nextStep.prepareSubmission(currentDriverSpec) } @@ -186,6 +195,13 @@ private[spark] object Client { } } + /** + * Entry point from SparkSubmit in spark-core + * + * + * @param args Array of strings that have interchanging values that will be + * parsed by ClientArguments with the identifiers that preceed the values + */ def main(args: Array[String]): Unit = { val parsedArguments = ClientArguments.fromCommandLineArgs(args) val sparkConf = new SparkConf() diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/PythonStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/PythonStep.scala index bb6d3da6a372..a16f1eac40ee 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/PythonStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/PythonStep.scala @@ -28,7 +28,7 @@ private[spark] class PythonStep( override def prepareSubmission(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = { val resolvedOtherPyFilesString = if (otherPyFiles.isEmpty) { - "no-py-files" + "null" } else { otherPyFiles.mkString(",") } diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala index 650ec4feb6a2..c6cd6a74c88d 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala @@ -72,7 +72,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { kubernetesTestComponents.deleteNamespace() } - test("Run PySpark Job on file from SUBMITTER") { + test("Run PySpark Job on file from SUBMITTER with --py-files") { assume(testBackend.name == MINIKUBE_TEST_BACKEND) launchStagingServer(SSLOptions(), None) @@ -82,7 +82,10 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { .set(EXECUTOR_DOCKER_IMAGE, System.getProperty("spark.docker.test.executorImage", "spark-executor-py:latest")) - runPySparkPiAndVerifyCompletion(PYSPARK_PI_SUBMITTER_LOCAL_FILE_LOCATION, Seq.empty[String]) + runPySparkPiAndVerifyCompletion( + PYSPARK_PI_SUBMITTER_LOCAL_FILE_LOCATION, + Seq(PYSPARK_SORT_CONTAINER_LOCAL_FILE_LOCATION) + ) } test("Run PySpark Job on file from CONTAINER with spark.jar defined") { @@ -154,7 +157,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { runSparkApplicationAndVerifyCompletion( JavaMainAppResource(SUBMITTER_LOCAL_MAIN_APP_RESOURCE), GROUP_BY_MAIN_CLASS, - "The Result is", + Seq("The Result is"), Array.empty[String], Seq.empty[String]) } @@ -218,7 +221,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { runSparkApplicationAndVerifyCompletion( JavaMainAppResource(SUBMITTER_LOCAL_MAIN_APP_RESOURCE), FILE_EXISTENCE_MAIN_CLASS, - s"File found at /opt/spark/${testExistenceFile.getName} with correct contents.", + Seq(s"File found at /opt/spark/${testExistenceFile.getName} with correct contents."), Array(testExistenceFile.getName, TEST_EXISTENCE_FILE_CONTENTS), Seq.empty[String]) } @@ -250,7 +253,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { runSparkApplicationAndVerifyCompletion( JavaMainAppResource(appResource), SPARK_PI_MAIN_CLASS, - "Pi is roughly 3", + Seq("Pi is roughly 3"), Array.empty[String], Seq.empty[String]) } @@ -260,7 +263,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { runSparkApplicationAndVerifyCompletion( PythonMainAppResource(appResource), PYSPARK_PI_MAIN_CLASS, - "Pi is roughly 3", + Seq("Submitting 5 missing tasks from ResultStage", "Pi is roughly 3"), Array("5"), otherPyFiles) } @@ -268,7 +271,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { private def runSparkApplicationAndVerifyCompletion( appResource: MainAppResource, mainClass: String, - expectedLogOnCompletion: String, + expectedLogOnCompletion: Seq[String], appArgs: Array[String], otherPyFiles: Seq[String]): Unit = { val clientArguments = ClientArguments( @@ -284,11 +287,13 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { .getItems .get(0) Eventually.eventually(TIMEOUT, INTERVAL) { - assert(kubernetesTestComponents.kubernetesClient - .pods() - .withName(driverPod.getMetadata.getName) - .getLog - .contains(expectedLogOnCompletion), "The application did not complete.") + expectedLogOnCompletion.foreach { e => + assert(kubernetesTestComponents.kubernetesClient + .pods() + .withName(driverPod.getMetadata.getName) + .getLog + .contains(e), "The application did not complete.") + } } } @@ -356,6 +361,8 @@ private[spark] object KubernetesSuite { val PYSPARK_PI_MAIN_CLASS = "org.apache.spark.deploy.PythonRunner" val PYSPARK_PI_CONTAINER_LOCAL_FILE_LOCATION = "local:///opt/spark/examples/src/main/python/pi.py" + val PYSPARK_SORT_CONTAINER_LOCAL_FILE_LOCATION = + "local:///opt/spark/examples/src/main/python/sort.py" val PYSPARK_PI_SUBMITTER_LOCAL_FILE_LOCATION = "src/test/python/pi.py" val FILE_EXISTENCE_MAIN_CLASS = "org.apache.spark.deploy.kubernetes" + ".integrationtest.jobs.FileExistenceTest" diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/backend/minikube/MinikubeTestBackend.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/backend/minikube/MinikubeTestBackend.scala index 45f702fa4897..461264877edc 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/backend/minikube/MinikubeTestBackend.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/backend/minikube/MinikubeTestBackend.scala @@ -27,7 +27,7 @@ private[spark] class MinikubeTestBackend extends IntegrationTestBackend { override def initialize(): Unit = { Minikube.startMinikube() -// new SparkDockerImageBuilder(Minikube.getDockerEnv).buildSparkDockerImages() + new SparkDockerImageBuilder(Minikube.getDockerEnv).buildSparkDockerImages() defaultClient = Minikube.getKubernetesClient }