From c18301022c4a84d710b0cc2f0e346ef250315d13 Mon Sep 17 00:00:00 2001 From: Stavros Kontopoulos Date: Thu, 19 Sep 2019 12:32:05 +0300 Subject: [PATCH 1/2] support python deps --- .../scala/org/apache/spark/SparkContext.scala | 4 +- .../org/apache/spark/deploy/SparkSubmit.scala | 1 + .../spark/deploy/k8s/KubernetesUtils.scala | 13 +++- .../k8s/features/BasicDriverFeatureStep.scala | 2 +- .../features/DriverCommandFeatureStep.scala | 13 +++- .../k8s/integrationtest/DepsTestsSuite.scala | 76 ++++++++++++++++++- .../deploy/k8s/integrationtest/Utils.scala | 31 +++++--- 7 files changed, 118 insertions(+), 22 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index d68015454de9..0440a9de6ab3 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1542,8 +1542,8 @@ class SparkContext(config: SparkConf) extends Logging { val schemeCorrectedURI = uri.getScheme match { case null => new File(path).getCanonicalFile.toURI case "local" => - logWarning("File with 'local' scheme is not supported to add to file server, since " + - "it is already available on every node.") + logWarning(s"File with 'local' scheme $path is not supported to add to file server, " + + s"since it is already available on every node.") return case _ => uri } diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 9a316e8c5b5a..4b1766149680 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -391,6 +391,7 @@ private[spark] class SparkSubmit extends Logging { downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr) }.orNull args.files = renameResourcesToLocalFS(args.files, localFiles) + args.pyFiles = renameResourcesToLocalFS(args.pyFiles, localPyFiles) } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala index e8bf8f9c9b50..7e5edd905781 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala @@ -261,12 +261,19 @@ private[spark] object KubernetesUtils extends Logging { isLocalDependency(Utils.resolveURI(resource)) } - def renameMainAppResource(resource: String, conf: SparkConf): String = { + def renameMainAppResource( + resource: String, + conf: Option[SparkConf] = None, + shouldUploadLocal: Boolean): String = { if (isLocalAndResolvable(resource)) { - SparkLauncher.NO_RESOURCE + if (shouldUploadLocal) { + uploadFileUri(resource, conf) + } else { + SparkLauncher.NO_RESOURCE + } } else { resource - } + } } def uploadFileUri(uri: String, conf: Option[SparkConf] = None): String = { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala index 6503bc823ec0..f5ba261c8f40 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala @@ -159,7 +159,7 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf) KUBERNETES_DRIVER_SUBMIT_CHECK.key -> "true", MEMORY_OVERHEAD_FACTOR.key -> overheadFactor.toString) // try upload local, resolvable files to a hadoop compatible file system - Seq(JARS, FILES).foreach { key => + Seq(JARS, FILES, SUBMIT_PYTHON_FILES).foreach { key => val value = conf.get(key).filter(uri => KubernetesUtils.isLocalAndResolvable(uri)) val resolved = KubernetesUtils.uploadAndTransformFileUris(value, Some(conf.sparkConf)) if (resolved.nonEmpty) { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala index ebe44855f1d0..d49381ba897d 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala @@ -62,7 +62,11 @@ private[spark] class DriverCommandFeatureStep(conf: KubernetesDriverConf) } private def configureForJava(pod: SparkPod, res: String): SparkPod = { - val driverContainer = baseDriverContainer(pod, res).build() + // re-write primary resource, app jar is also added to spark.jars by default in SparkSubmit + // no uploading takes place here + val newResName = KubernetesUtils + .renameMainAppResource(resource = res, shouldUploadLocal = false) + val driverContainer = baseDriverContainer(pod, newResName).build() SparkPod(pod.pod, driverContainer) } @@ -73,7 +77,10 @@ private[spark] class DriverCommandFeatureStep(conf: KubernetesDriverConf) .withValue(conf.get(PYSPARK_MAJOR_PYTHON_VERSION)) .build()) - val pythonContainer = baseDriverContainer(pod, res) + // re-write primary resource to be the remote one and upload the related file + val newResName = KubernetesUtils + .renameMainAppResource(res, Option(conf.sparkConf), true) + val pythonContainer = baseDriverContainer(pod, newResName) .addAllToEnv(pythonEnvs.asJava) .build() @@ -88,7 +95,7 @@ private[spark] class DriverCommandFeatureStep(conf: KubernetesDriverConf) private def baseDriverContainer(pod: SparkPod, resource: String): ContainerBuilder = { // re-write primary resource, app jar is also added to spark.jars by default in SparkSubmit val resolvedResource = if (conf.mainAppResource.isInstanceOf[JavaMainAppResource]) { - KubernetesUtils.renameMainAppResource(resource, conf.sparkConf) + KubernetesUtils.renameMainAppResource(resource, Option(conf.sparkConf), false) } else { resource } diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DepsTestsSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DepsTestsSuite.scala index e712b95cdbce..d08f7440416f 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DepsTestsSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DepsTestsSuite.scala @@ -120,16 +120,18 @@ private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite => .endSpec() .build() - kubernetesTestComponents + // try until the service from a previous test is deleted + Eventually.eventually(TIMEOUT, INTERVAL) (kubernetesTestComponents .kubernetesClient .services() - .create(minioService) + .create(minioService)) - kubernetesTestComponents + // try until the stateful set of a previous test is deleted + Eventually.eventually(TIMEOUT, INTERVAL) (kubernetesTestComponents .kubernetesClient .apps() .statefulSets() - .create(minioStatefulSet) + .create(minioStatefulSet)) } private def deleteMinioStorage(): Unit = { @@ -138,12 +140,14 @@ private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite => .apps() .statefulSets() .withName(cName) + .withGracePeriod(0) .delete() kubernetesTestComponents .kubernetesClient .services() .withName(svcName) + .withGracePeriod(0) .delete() } @@ -181,6 +185,32 @@ private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite => } } + test("Launcher python client dependencies using a zip file", k8sTestTag, MinikubeTag) { + val inDepsFile = Utils.getTestFileAbsolutePath("py_container_checks.py", sparkHomeDir) + val outDepsFile = s"${inDepsFile.substring(0, inDepsFile.lastIndexOf("."))}.zip" + Utils.createZipFile(inDepsFile, outDepsFile) + testPythonDeps(outDepsFile) + } + + private def testPythonDeps(depsFile: String): Unit = { + tryDepsTest({ + val pySparkFiles = Utils.getTestFileAbsolutePath("pyfiles.py", sparkHomeDir) + setPythonSparkConfProperties(sparkAppConf) + runSparkApplicationAndVerifyCompletion( + appResource = pySparkFiles, + mainClass = "", + expectedLogOnCompletion = Seq( + "Python runtime version check is: True", + "Python environment version check is: True", + "Python runtime version check for executor is: True"), + appArgs = Array("python3"), + driverPodChecker = doBasicDriverPyPodCheck, + executorPodChecker = doBasicExecutorPyPodCheck, + appLocator = appLocator, + isJVM = false, + pyFiles = Option(depsFile)) }) + } + private def extractS3Key(data: String, key: String): String = { data.split("\n") .filter(_.contains(key)) @@ -222,6 +252,44 @@ private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite => url } } + + private def getServiceHostAndPort(minioUrlStr : String) : (String, Int) = { + val minioUrl = new URL(minioUrlStr) + (minioUrl.getHost, minioUrl.getPort) + } + + private def setCommonSparkConfPropertiesForS3Access( + conf: SparkAppConf, + minioUrlStr: String): Unit = { + val (minioHost, minioPort) = getServiceHostAndPort(minioUrlStr) + conf.set("spark.hadoop.fs.s3a.access.key", ACCESS_KEY) + .set("spark.hadoop.fs.s3a.secret.key", SECRET_KEY) + .set("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") + .set("spark.hadoop.fs.s3a.endpoint", s"$minioHost:$minioPort") + .set("spark.kubernetes.file.upload.path", s"s3a://$BUCKET") + .set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") + .set("spark.jars.packages", "com.amazonaws:aws-java-sdk:" + + "1.7.4,org.apache.hadoop:hadoop-aws:2.7.6") + .set("spark.driver.extraJavaOptions", "-Divy.cache.dir=/tmp -Divy.home=/tmp") + } + + private def setPythonSparkConfProperties(conf: SparkAppConf): Unit = { + sparkAppConf.set("spark.kubernetes.container.image", pyImage) + .set("spark.kubernetes.pyspark.pythonVersion", "3") + } + + private def tryDepsTest(runTest: => Unit): Unit = { + try { + setupMinioStorage() + val minioUrlStr = getServiceUrl(svcName) + createS3Bucket(ACCESS_KEY, SECRET_KEY, minioUrlStr) + setCommonSparkConfPropertiesForS3Access(sparkAppConf, minioUrlStr) + runTest + } finally { + // make sure this always runs + deleteMinioStorage() + } + } } private[spark] object DepsTestsSuite { diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala index 9bcd6e950353..d00716a7a47d 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala @@ -16,15 +16,17 @@ */ package org.apache.spark.deploy.k8s.integrationtest -import java.io.{Closeable, File, PrintWriter} +import java.io.{Closeable, File, FileInputStream, FileOutputStream, PrintWriter} import java.nio.file.{Files, Path} import java.util.concurrent.CountDownLatch +import java.util.zip.{ZipEntry, ZipOutputStream} import scala.collection.JavaConverters._ import scala.util.Try import io.fabric8.kubernetes.client.dsl.ExecListener import okhttp3.Response +import org.apache.commons.compress.utils.IOUtils import org.apache.commons.io.output.ByteArrayOutputStream import org.apache.hadoop.util.VersionInfo @@ -114,23 +116,22 @@ object Utils extends Logging { filename } - def getExamplesJarAbsolutePath(sparkHomeDir: Path): String = { - val jarName = getExamplesJarName() - val jarPathsFound = Files + def getTestFileAbsolutePath(fileName: String, sparkHomeDir: Path): String = { + val filePathsFound = Files .walk(sparkHomeDir) .filter(Files.isRegularFile(_)) - .filter((f: Path) => {f.toFile.getName == jarName}) + .filter((f: Path) => {f.toFile.getName == fileName}) // we should not have more than one here under current test build dir // we only need one though - val jarPath = jarPathsFound + val filePath = filePathsFound .iterator() .asScala .map(_.toAbsolutePath.toString) .toArray .headOption - jarPath match { - case Some(jar) => jar - case _ => throw new SparkException(s"No valid $jarName file was found " + + filePath match { + case Some(file) => file + case _ => throw new SparkException(s"No valid $fileName file was found " + s"under spark home test dir ${sparkHomeDir.toAbsolutePath}!") } } @@ -138,4 +139,16 @@ object Utils extends Logging { def isHadoop3(): Boolean = { VersionInfo.getVersion.startsWith("3") } + + def createZipFile(inFile: String, outFile: String): Unit = { + val fileToZip = new File(inFile) + val fis = new FileInputStream(fileToZip) + val fos = new FileOutputStream(outFile) + val zipOut = new ZipOutputStream(fos) + val zipEntry = new ZipEntry(fileToZip.getName) + zipOut.putNextEntry(zipEntry) + IOUtils.copy(fis, zipOut) + IOUtils.closeQuietly(fis) + IOUtils.closeQuietly(zipOut) + } } From 0145183265a1a27c38cd853c3bba53df0b8552e6 Mon Sep 17 00:00:00 2001 From: Stavros Kontopoulos Date: Wed, 18 Nov 2020 15:51:50 +0200 Subject: [PATCH 2/2] update dep it --- .../k8s/integrationtest/DepsTestsSuite.scala | 42 ++++++------------- 1 file changed, 12 insertions(+), 30 deletions(-) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DepsTestsSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DepsTestsSuite.scala index d08f7440416f..8f6e9cd8af74 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DepsTestsSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DepsTestsSuite.scala @@ -30,6 +30,7 @@ import org.scalatest.time.{Minutes, Span} import org.apache.spark.SparkException import org.apache.spark.deploy.k8s.integrationtest.DepsTestsSuite.{DEPS_TIMEOUT, FILE_CONTENTS, HOST_PATH} import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite.{INTERVAL, MinikubeTag, TIMEOUT} +import org.apache.spark.deploy.k8s.integrationtest.Utils.getExamplesJarName import org.apache.spark.deploy.k8s.integrationtest.backend.minikube.Minikube private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite => @@ -152,37 +153,14 @@ private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite => } test("Launcher client dependencies", k8sTestTag, MinikubeTag) { - val packages = if (Utils.isHadoop3) { - "org.apache.hadoop:hadoop-aws:3.2.0" - } else { - "com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.6" - } - val fileName = Utils.createTempFile(FILE_CONTENTS, HOST_PATH) - try { - setupMinioStorage() - val minioUrlStr = getServiceUrl(svcName) - val minioUrl = new URL(minioUrlStr) - val minioHost = minioUrl.getHost - val minioPort = minioUrl.getPort - val examplesJar = Utils.getExamplesJarAbsolutePath(sparkHomeDir) - sparkAppConf - .set("spark.hadoop.fs.s3a.access.key", ACCESS_KEY) - .set("spark.hadoop.fs.s3a.secret.key", SECRET_KEY) - .set("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") - .set("spark.hadoop.fs.s3a.endpoint", s"$minioHost:$minioPort") - .set("spark.kubernetes.file.upload.path", s"s3a://$BUCKET") - .set("spark.files", s"$HOST_PATH/$fileName") - .set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") - .set("spark.jars.packages", packages) - .set("spark.driver.extraJavaOptions", "-Divy.cache.dir=/tmp -Divy.home=/tmp") - createS3Bucket(ACCESS_KEY, SECRET_KEY, minioUrlStr) + tryDepsTest({ + val fileName = Utils.createTempFile(FILE_CONTENTS, HOST_PATH) + sparkAppConf.set("spark.files", s"$HOST_PATH/$fileName") + val examplesJar = Utils.getTestFileAbsolutePath(getExamplesJarName(), sparkHomeDir) runSparkRemoteCheckAndVerifyCompletion(appResource = examplesJar, appArgs = Array(fileName), timeout = Option(DEPS_TIMEOUT)) - } finally { - // make sure this always runs - deleteMinioStorage() - } + }) } test("Launcher python client dependencies using a zip file", k8sTestTag, MinikubeTag) { @@ -262,14 +240,18 @@ private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite => conf: SparkAppConf, minioUrlStr: String): Unit = { val (minioHost, minioPort) = getServiceHostAndPort(minioUrlStr) + val packages = if (Utils.isHadoop3) { + "org.apache.hadoop:hadoop-aws:3.2.0" + } else { + "com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.6" + } conf.set("spark.hadoop.fs.s3a.access.key", ACCESS_KEY) .set("spark.hadoop.fs.s3a.secret.key", SECRET_KEY) .set("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") .set("spark.hadoop.fs.s3a.endpoint", s"$minioHost:$minioPort") .set("spark.kubernetes.file.upload.path", s"s3a://$BUCKET") .set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") - .set("spark.jars.packages", "com.amazonaws:aws-java-sdk:" + - "1.7.4,org.apache.hadoop:hadoop-aws:2.7.6") + .set("spark.jars.packages", packages) .set("spark.driver.extraJavaOptions", "-Divy.cache.dir=/tmp -Divy.home=/tmp") }