From c2f782b5d9da68a7a207269d48b897a1f482e48b Mon Sep 17 00:00:00 2001 From: Qi Shao Date: Wed, 14 Nov 2018 15:04:37 -0500 Subject: [PATCH 1/5] Copy python/pyspark to ${SPARK_HOME}/python/pyspark to make bin/pyspark work properly in in Docker container --- .../docker/src/main/dockerfiles/spark/bindings/python/Dockerfile | 1 + 1 file changed, 1 insertion(+) diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/bindings/python/Dockerfile b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/bindings/python/Dockerfile index 69b6efa6149a0..73a5597086f9e 100644 --- a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/bindings/python/Dockerfile +++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/bindings/python/Dockerfile @@ -32,6 +32,7 @@ RUN apk add --no-cache python && \ # Removed the .cache to save space rm -r /root/.cache +COPY python/pyspark ${SPARK_HOME}/python/pyspark COPY python/lib ${SPARK_HOME}/python/lib ENV PYTHONPATH ${SPARK_HOME}/python/lib/pyspark.zip:${SPARK_HOME}/python/lib/py4j-*.zip From 4bf6bc6b2b9e12c7a29efe3929c4101db4957d4c Mon Sep 17 00:00:00 2001 From: Qi Shao Date: Mon, 19 Nov 2018 17:25:36 -0500 Subject: [PATCH 2/5] Added test for client mode pyspark shell into PythonTestsSuite --- .../integrationtest/PythonTestsSuite.scala | 89 +++++++++++++++++++ 1 file changed, 89 insertions(+) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PythonTestsSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PythonTestsSuite.scala index 904279923334f..47fc557078b6a 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PythonTestsSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PythonTestsSuite.scala @@ -16,6 +16,11 @@ */ package org.apache.spark.deploy.k8s.integrationtest +import scala.collection.JavaConverters._ +import org.scalatest.concurrent.Eventually + +import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite.{k8sTestTag, INTERVAL, TIMEOUT} + private[spark] trait PythonTestsSuite { k8sSuite: KubernetesSuite => import PythonTestsSuite._ @@ -89,6 +94,90 @@ private[spark] trait PythonTestsSuite { k8sSuite: KubernetesSuite => isJVM = false, pyFiles = Some(PYSPARK_CONTAINER_TESTS)) } + + test("Run bin/pyspark in client mode", k8sTestTag) { + val labels = Map("spark-app-selector" -> driverPodName) + val driverPort = 7077 + val blockManagerPort = 10000 + val driverService = testBackend + .getKubernetesClient + .services() + .inNamespace(kubernetesTestComponents.namespace) + .createNew() + .withNewMetadata() + .withName(s"$driverPodName-svc") + .endMetadata() + .withNewSpec() + .withClusterIP("None") + .withSelector(labels.asJava) + .addNewPort() + .withName("driver-port") + .withPort(driverPort) + .withNewTargetPort(driverPort) + .endPort() + .addNewPort() + .withName("block-manager") + .withPort(blockManagerPort) + .withNewTargetPort(blockManagerPort) + .endPort() + .endSpec() + .done() + try { + val driverPod = testBackend + .getKubernetesClient + .pods() + .inNamespace(kubernetesTestComponents.namespace) + .createNew() + .withNewMetadata() + .withName(driverPodName) + .withLabels(labels.asJava) + .endMetadata() + .withNewSpec() + .withServiceAccountName(kubernetesTestComponents.serviceAccountName) + .addNewContainer() + .withName("pyspark-example") + .withImage(image) + .withImagePullPolicy("IfNotPresent") + .withCommand("/opt/spark/bin/pyspark") + .addToArgs("--master", s"k8s://https://kubernetes.default.svc") + .addToArgs("--deploy-mode", "client") + .addToArgs("--conf", s"spark.kubernetes.container.image="+pyImage) + .addToArgs( + "--conf", + s"spark.kubernetes.namespace=${kubernetesTestComponents.namespace}") + .addToArgs("--conf", "spark.kubernetes.authenticate.oauthTokenFile=" + + "/var/run/secrets/kubernetes.io/serviceaccount/token") + .addToArgs("--conf", "spark.kubernetes.authenticate.caCertFile=" + + "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt") + .addToArgs("--conf", s"spark.kubernetes.driver.pod.name=$driverPodName") + .addToArgs("--conf", "spark.executor.memory=500m") + .addToArgs("--conf", "spark.executor.cores=1") + .addToArgs("--conf", "spark.executor.instances=1") + .addToArgs("--conf", + s"spark.driver.host=" + + s"${driverService.getMetadata.getName}.${kubernetesTestComponents.namespace}.svc") + .addToArgs("--conf", s"spark.driver.port=$driverPort") + .addToArgs("--conf", s"spark.driver.blockManager.port=$blockManagerPort") + .endContainer() + .endSpec() + .done() + Eventually.eventually(TIMEOUT, INTERVAL) { + assert(kubernetesTestComponents.kubernetesClient + .pods() + .withName(driverPodName) + .getLog + .contains("SparkSession available"), "The application did not complete.") + } + } finally { + // Have to delete the service manually since it doesn't have an owner reference + kubernetesTestComponents + .kubernetesClient + .services() + .inNamespace(kubernetesTestComponents.namespace) + .delete(driverService) + } + } + } private[spark] object PythonTestsSuite { From c626bd1cc3ed754502373a1df4fb8943cf56fde6 Mon Sep 17 00:00:00 2001 From: Qi Shao Date: Tue, 20 Nov 2018 09:56:01 -0500 Subject: [PATCH 3/5] Correct pyspark shell test's driver image --- .../spark/deploy/k8s/integrationtest/PythonTestsSuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PythonTestsSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PythonTestsSuite.scala index 47fc557078b6a..d5f7cf056f828 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PythonTestsSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PythonTestsSuite.scala @@ -95,7 +95,7 @@ private[spark] trait PythonTestsSuite { k8sSuite: KubernetesSuite => pyFiles = Some(PYSPARK_CONTAINER_TESTS)) } - test("Run bin/pyspark in client mode", k8sTestTag) { + test("Run PySpark shell", k8sTestTag) { val labels = Map("spark-app-selector" -> driverPodName) val driverPort = 7077 val blockManagerPort = 10000 @@ -135,8 +135,8 @@ private[spark] trait PythonTestsSuite { k8sSuite: KubernetesSuite => .withNewSpec() .withServiceAccountName(kubernetesTestComponents.serviceAccountName) .addNewContainer() - .withName("pyspark-example") - .withImage(image) + .withName("pyspark-shell") + .withImage(pyImage) .withImagePullPolicy("IfNotPresent") .withCommand("/opt/spark/bin/pyspark") .addToArgs("--master", s"k8s://https://kubernetes.default.svc") From 149bed4c7083d04a7a05e931c8584670fc937cb2 Mon Sep 17 00:00:00 2001 From: Qi Shao Date: Thu, 29 Nov 2018 14:30:44 -0500 Subject: [PATCH 4/5] Remove erroring test case for pyspark shell --- .../integrationtest/PythonTestsSuite.scala | 91 +------------------ 1 file changed, 1 insertion(+), 90 deletions(-) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PythonTestsSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PythonTestsSuite.scala index d5f7cf056f828..35941225a7dc1 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PythonTestsSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PythonTestsSuite.scala @@ -16,11 +16,6 @@ */ package org.apache.spark.deploy.k8s.integrationtest -import scala.collection.JavaConverters._ -import org.scalatest.concurrent.Eventually - -import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite.{k8sTestTag, INTERVAL, TIMEOUT} - private[spark] trait PythonTestsSuite { k8sSuite: KubernetesSuite => import PythonTestsSuite._ @@ -94,90 +89,6 @@ private[spark] trait PythonTestsSuite { k8sSuite: KubernetesSuite => isJVM = false, pyFiles = Some(PYSPARK_CONTAINER_TESTS)) } - - test("Run PySpark shell", k8sTestTag) { - val labels = Map("spark-app-selector" -> driverPodName) - val driverPort = 7077 - val blockManagerPort = 10000 - val driverService = testBackend - .getKubernetesClient - .services() - .inNamespace(kubernetesTestComponents.namespace) - .createNew() - .withNewMetadata() - .withName(s"$driverPodName-svc") - .endMetadata() - .withNewSpec() - .withClusterIP("None") - .withSelector(labels.asJava) - .addNewPort() - .withName("driver-port") - .withPort(driverPort) - .withNewTargetPort(driverPort) - .endPort() - .addNewPort() - .withName("block-manager") - .withPort(blockManagerPort) - .withNewTargetPort(blockManagerPort) - .endPort() - .endSpec() - .done() - try { - val driverPod = testBackend - .getKubernetesClient - .pods() - .inNamespace(kubernetesTestComponents.namespace) - .createNew() - .withNewMetadata() - .withName(driverPodName) - .withLabels(labels.asJava) - .endMetadata() - .withNewSpec() - .withServiceAccountName(kubernetesTestComponents.serviceAccountName) - .addNewContainer() - .withName("pyspark-shell") - .withImage(pyImage) - .withImagePullPolicy("IfNotPresent") - .withCommand("/opt/spark/bin/pyspark") - .addToArgs("--master", s"k8s://https://kubernetes.default.svc") - .addToArgs("--deploy-mode", "client") - .addToArgs("--conf", s"spark.kubernetes.container.image="+pyImage) - .addToArgs( - "--conf", - s"spark.kubernetes.namespace=${kubernetesTestComponents.namespace}") - .addToArgs("--conf", "spark.kubernetes.authenticate.oauthTokenFile=" + - "/var/run/secrets/kubernetes.io/serviceaccount/token") - .addToArgs("--conf", "spark.kubernetes.authenticate.caCertFile=" + - "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt") - .addToArgs("--conf", s"spark.kubernetes.driver.pod.name=$driverPodName") - .addToArgs("--conf", "spark.executor.memory=500m") - .addToArgs("--conf", "spark.executor.cores=1") - .addToArgs("--conf", "spark.executor.instances=1") - .addToArgs("--conf", - s"spark.driver.host=" + - s"${driverService.getMetadata.getName}.${kubernetesTestComponents.namespace}.svc") - .addToArgs("--conf", s"spark.driver.port=$driverPort") - .addToArgs("--conf", s"spark.driver.blockManager.port=$blockManagerPort") - .endContainer() - .endSpec() - .done() - Eventually.eventually(TIMEOUT, INTERVAL) { - assert(kubernetesTestComponents.kubernetesClient - .pods() - .withName(driverPodName) - .getLog - .contains("SparkSession available"), "The application did not complete.") - } - } finally { - // Have to delete the service manually since it doesn't have an owner reference - kubernetesTestComponents - .kubernetesClient - .services() - .inNamespace(kubernetesTestComponents.namespace) - .delete(driverService) - } - } - } private[spark] object PythonTestsSuite { @@ -187,4 +98,4 @@ private[spark] object PythonTestsSuite { val PYSPARK_FILES: String = TEST_LOCAL_PYSPARK + "pyfiles.py" val PYSPARK_CONTAINER_TESTS: String = TEST_LOCAL_PYSPARK + "py_container_checks.py" val PYSPARK_MEMORY_CHECK: String = TEST_LOCAL_PYSPARK + "worker_memory_check.py" -} +} \ No newline at end of file From 5f6aaf83ddaa6ff3da88a3aa5d78bef925f0bf80 Mon Sep 17 00:00:00 2001 From: Qi Shao Date: Fri, 30 Nov 2018 20:40:14 -0500 Subject: [PATCH 5/5] Updated docker-image-tool.sh with copying pyspark --- bin/docker-image-tool.sh | 1 + .../spark/deploy/k8s/integrationtest/PythonTestsSuite.scala | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/bin/docker-image-tool.sh b/bin/docker-image-tool.sh index fbf9c9e448fd1..4f66137eb1c7a 100755 --- a/bin/docker-image-tool.sh +++ b/bin/docker-image-tool.sh @@ -107,6 +107,7 @@ function create_dev_build_context {( "$PYSPARK_CTX/kubernetes/dockerfiles" mkdir "$PYSPARK_CTX/python" cp -r "python/lib" "$PYSPARK_CTX/python/lib" + cp -r "python/pyspark" "$PYSPARK_CTX/python/pyspark" local R_CTX="$CTX_DIR/sparkr" mkdir -p "$R_CTX/kubernetes" diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PythonTestsSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PythonTestsSuite.scala index 35941225a7dc1..904279923334f 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PythonTestsSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PythonTestsSuite.scala @@ -98,4 +98,4 @@ private[spark] object PythonTestsSuite { val PYSPARK_FILES: String = TEST_LOCAL_PYSPARK + "pyfiles.py" val PYSPARK_CONTAINER_TESTS: String = TEST_LOCAL_PYSPARK + "py_container_checks.py" val PYSPARK_MEMORY_CHECK: String = TEST_LOCAL_PYSPARK + "worker_memory_check.py" -} \ No newline at end of file +}