From a67f6a15070fca172f05e277036805d88190f45e Mon Sep 17 00:00:00 2001 From: jerryshao Date: Tue, 5 Sep 2017 16:18:36 +0800 Subject: [PATCH 01/10] Download remote http(s) resources to local in yarn mode Change-Id: I7897817ceaaafecd779a6e085c96d2a28363d7d6 --- .../org/apache/spark/deploy/SparkSubmit.scala | 36 +++++++++++++++++++ .../spark/deploy/SparkSubmitSuite.scala | 33 +++++++++++++++++ 2 files changed, 69 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index ea9c9bdaede76..0ae13e2d9ce35 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -367,6 +367,42 @@ object SparkSubmit extends CommandLineUtils with Logging { }.orNull } + if (clusterManager == YARN) { + // This security manager will not need an auth secret, but set a dummy value in case + // spark.authenticate is enabled, otherwise an exception is thrown. + sparkConf.setIfMissing(SecurityManager.SPARK_AUTH_SECRET_CONF, "unused") + val secMgr = new SecurityManager(sparkConf) + + def downloadHttpResource(resource: String): String = { + val uri = Utils.resolveURI(resource) + uri.getScheme match { + case "local" | "file" => resource + case "http" | "https" | "ftp" => + if (deployMode == CLIENT) { + val fileName = new Path(uri).getName + new File(targetDir, fileName).toURI.toString + } else { + downloadFile(resource, targetDir, sparkConf, hadoopConf, secMgr) + } + case _ => uri.toString + } + } + + args.primaryResource = Option(args.primaryResource).map { downloadHttpResource }.orNull + args.files = Option(args.files).map { files => + files.split(",").map(_.trim).filter(_.nonEmpty).map { downloadHttpResource }.mkString(",") + }.orNull + args.pyFiles = Option(args.pyFiles).map { files => + files.split(",").map(_.trim).filter(_.nonEmpty).map { downloadHttpResource }.mkString(",") + }.orNull + args.jars = Option(args.jars).map { files => + files.split(",").map(_.trim).filter(_.nonEmpty).map { downloadHttpResource }.mkString(",") + }.orNull + args.archives = Option(args.archives).map { files => + files.split(",").map(_.trim).filter(_.nonEmpty).map { downloadHttpResource }.mkString(",") + }.orNull + } + // If we're running a python app, set the main class to our specific python runner if (args.isPython && deployMode == CLIENT) { if (args.primaryResource == PYSPARK_SHELL) { diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index 4d69ce844d2ea..273831ec858b2 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -897,6 +897,39 @@ class SparkSubmitSuite sysProps("spark.submit.pyFiles") should (startWith("/")) } + test("handle remote http(s) resources in yarn mode") { + val hadoopConf = new Configuration() + updateConfWithFakeS3Fs(hadoopConf) + + val tmpDir = Utils.createTempDir() + val mainResource = File.createTempFile("tmpPy", ".py", tmpDir) + val tmpJar = TestUtils.createJarWithFiles(Map("test.resource" -> "USER"), tmpDir) + val tmpJarPath = s"s3a://${new File(tmpJar.toURI).getAbsolutePath}" + // This assumes UT environment could access external network. + val remoteHttpJar = + "http://central.maven.org/maven2/io/netty/netty-all/4.0.51.Final/netty-all-4.0.51.Final.jar" + + val args = Seq( + "--class", UserClasspathFirstTest.getClass.getName.stripPrefix("$"), + "--name", "testApp", + "--master", "yarn", + "--deploy-mode", "client", + "--jars", s"$tmpJarPath,$remoteHttpJar", + s"s3a://$mainResource" + ) + + val appArgs = new SparkSubmitArguments(args) + val sysProps = SparkSubmit.prepareSubmitEnvironment(appArgs, Some(hadoopConf))._3 + + // Resources in S3 should still be remote path, but remote http resource will be downloaded + // to local. + val jars = sysProps("spark.yarn.dist.jars").split(",").toSet + jars.contains(tmpJarPath) should be (true) + val nettyJar = jars.filter(_.contains("netty")) + nettyJar.size should be (1) + nettyJar.head should startWith("file:") + } + // NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly. private def runSparkSubmit(args: Seq[String]): Unit = { val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!")) From 96720e858ca0caf1512d475e899896f5ed29e971 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Wed, 6 Sep 2017 15:27:55 +0800 Subject: [PATCH 02/10] Address the comments Change-Id: I9176970799c5aa33f0dbd6556509b2d1f77b6f6b --- .../org/apache/spark/deploy/SparkSubmit.scala | 17 +++++-- .../spark/internal/config/package.scala | 8 +++ .../spark/deploy/SparkSubmitSuite.scala | 49 +++++++++++++++++-- docs/running-on-yarn.md | 9 ++++ 4 files changed, 76 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 0ae13e2d9ce35..b9d6c9151be91 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -25,11 +25,11 @@ import java.text.ParseException import scala.annotation.tailrec import scala.collection.mutable.{ArrayBuffer, HashMap, Map} -import scala.util.Properties +import scala.util.{Properties, Try} import org.apache.commons.lang3.StringUtils import org.apache.hadoop.conf.{Configuration => HadoopConfiguration} -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.ivy.Ivy @@ -48,6 +48,7 @@ import org.apache.spark._ import org.apache.spark.api.r.RUtils import org.apache.spark.deploy.rest._ import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ import org.apache.spark.launcher.SparkLauncher import org.apache.spark.util._ @@ -367,7 +368,17 @@ object SparkSubmit extends CommandLineUtils with Logging { }.orNull } - if (clusterManager == YARN) { + // Using a dummy http URI to check if HTTP(s) FileSystem is available, it returns true in + // Hadoop 2.9+, otherwise it returns false. + val isHttpFsAvailable = Try { FileSystem.get(Utils.resolveURI("http://foo/bar"), hadoopConf) } + .map(_ => true) + .getOrElse(false) + // When running in YARN cluster manager, we check the configuration + // "spark.yarn.dist.forceDownloadResources", if true we always download remote HTTP(s) + // resources to local and then re-upload them to Hadoop FS, if false we need to check the + // availability of HTTP(s) FileSystem to decide wether to use HTTP(s) FS to handle resources + // or not. + if (clusterManager == YARN && (sparkConf.get(FORCE_DOWNLOAD_RESOURCES) || !isHttpFsAvailable)) { // This security manager will not need an auth secret, but set a dummy value in case // spark.authenticate is enabled, otherwise an exception is thrown. sparkConf.setIfMissing(SecurityManager.SPARK_AUTH_SECRET_CONF, "unused") diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index e0f696080e566..af20684f798c5 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -400,4 +400,12 @@ package object config { .doc("Memory to request as a multiple of the size that used to unroll the block.") .doubleConf .createWithDefault(1.5) + + private[spark] val FORCE_DOWNLOAD_RESOURCES = + ConfigBuilder("spark.yarn.dist.forceDownloadResources") + .doc("Whether to download remote HTTP(s) resources to local and upload to Hadoop FS. " + + "This is only honored in Hadoop 2.9+ environment to bypass the build-in HTTP(s) " + + "FileSystem and use Spark's own logic to handle remote HTTP(s) resources") + .booleanConf + .createWithDefault(false) } diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index 273831ec858b2..6d8479bcd4ad2 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -907,7 +907,8 @@ class SparkSubmitSuite val tmpJarPath = s"s3a://${new File(tmpJar.toURI).getAbsolutePath}" // This assumes UT environment could access external network. val remoteHttpJar = - "http://central.maven.org/maven2/io/netty/netty-all/4.0.51.Final/netty-all-4.0.51.Final.jar" + "http://central.maven.org/maven2/io/dropwizard/metrics/metrics-core/" + + "3.2.4/metrics-core-3.2.4.jar" val args = Seq( "--class", UserClasspathFirstTest.getClass.getName.stripPrefix("$"), @@ -925,9 +926,49 @@ class SparkSubmitSuite // to local. val jars = sysProps("spark.yarn.dist.jars").split(",").toSet jars.contains(tmpJarPath) should be (true) - val nettyJar = jars.filter(_.contains("netty")) - nettyJar.size should be (1) - nettyJar.head should startWith("file:") + val metricsJar = jars.filter(_.contains("metrics")) + metricsJar.size should be (1) + metricsJar.head should startWith("file:") + + val hadoopConf1 = new Configuration() + updateConfWithFakeS3Fs(hadoopConf1) + hadoopConf1.set("fs.http.impl", classOf[TestFileSystem].getCanonicalName) + hadoopConf1.set("fs.http.impl.disable.cache", "true") + + val args1 = Seq( + "--class", UserClasspathFirstTest.getClass.getName.stripPrefix("$"), + "--name", "testApp", + "--master", "yarn", + "--deploy-mode", "client", + "--jars", s"$tmpJarPath,$remoteHttpJar", + s"s3a://$mainResource" + ) + + val appArgs1 = new SparkSubmitArguments(args1) + val sysProps1 = SparkSubmit.prepareSubmitEnvironment(appArgs1, Some(hadoopConf1))._3 + + // If Hadoop FileSystem supports remote HTTP(S) resoures, we will still keep HTTP + // resource URI, not the local one. + val jars1 = sysProps1("spark.yarn.dist.jars").split(",").toSet + jars1.contains(remoteHttpJar) should be (true) + + val args2 = Seq( + "--class", UserClasspathFirstTest.getClass.getName.stripPrefix("$"), + "--conf", "spark.yarn.dist.forceDownloadResources=true", + "--name", "testApp", + "--master", "yarn", + "--deploy-mode", "client", + "--jars", s"$tmpJarPath,$remoteHttpJar", + s"s3a://$mainResource" + ) + + val appArgs2 = new SparkSubmitArguments(args2) + val sysProps2 = SparkSubmit.prepareSubmitEnvironment(appArgs2, Some(hadoopConf1))._3 + + // We explicitly disable using HTTP(S) FileSystem, force downloading remote HTTP(s) + // resources, so the URI should be local one. + val jars2 = sysProps2("spark.yarn.dist.jars").split(",").toSet + jars.filter(_.contains("metrics")).head should startWith("file:") } // NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly. diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index e4a74556d4f26..36a10505730f3 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -211,6 +211,15 @@ To use a custom metrics.properties for the application master and executors, upd Comma-separated list of jars to be placed in the working directory of each executor. + + spark.yarn.dist.forceDownloadResources + false + + Whether to download remote HTTP(s) resources to local and upload to Hadoop FS. This is only + honored in Hadoop 2.9+ environment to bypass the build-in HTTP(s) FileSystem and use Spark's + own logic to handle remote HTTP(s) resources" + + spark.executor.instances 2 From b3e0961b7a929ab7fef0a3a035fbe834b7c44718 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Tue, 12 Sep 2017 14:01:50 +0800 Subject: [PATCH 03/10] Address the comments --- .../org/apache/spark/deploy/SparkSubmit.scala | 41 ++++++++++--------- .../spark/internal/config/package.scala | 14 +++---- .../spark/deploy/SparkSubmitSuite.scala | 2 +- docs/running-on-yarn.md | 9 ++-- 4 files changed, 33 insertions(+), 33 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index b9d6c9151be91..1f95a5322575e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -368,28 +368,29 @@ object SparkSubmit extends CommandLineUtils with Logging { }.orNull } - // Using a dummy http URI to check if HTTP(s) FileSystem is available, it returns true in - // Hadoop 2.9+, otherwise it returns false. - val isHttpFsAvailable = Try { FileSystem.get(Utils.resolveURI("http://foo/bar"), hadoopConf) } - .map(_ => true) - .getOrElse(false) - // When running in YARN cluster manager, we check the configuration - // "spark.yarn.dist.forceDownloadResources", if true we always download remote HTTP(s) - // resources to local and then re-upload them to Hadoop FS, if false we need to check the - // availability of HTTP(s) FileSystem to decide wether to use HTTP(s) FS to handle resources - // or not. - if (clusterManager == YARN && (sparkConf.get(FORCE_DOWNLOAD_RESOURCES) || !isHttpFsAvailable)) { - // This security manager will not need an auth secret, but set a dummy value in case - // spark.authenticate is enabled, otherwise an exception is thrown. + // When running in YARN cluster manager, + if (clusterManager == YARN) { sparkConf.setIfMissing(SecurityManager.SPARK_AUTH_SECRET_CONF, "unused") val secMgr = new SecurityManager(sparkConf) + val forceDownloadSchemes = sparkConf.get(FORCE_DOWNLOAD_SCHEMES) + + // Check the scheme list provided by "spark.yarn.dist.forceDownloadSchemes" to see if current + // resource's scheme is included in this list, or Hadoop FileSystem doesn't support current + // scheme, if so Spark will download the resources to local disk and upload to Hadoop FS. + def shouldDownload(scheme: String): Boolean = { + val isFsAvailable = Try { FileSystem.getFileSystemClass(scheme, hadoopConf) } + .map(_ => true).getOrElse(false) + forceDownloadSchemes.contains(scheme) || !isFsAvailable + } - def downloadHttpResource(resource: String): String = { + def downloadResource(resource: String): String = { val uri = Utils.resolveURI(resource) uri.getScheme match { case "local" | "file" => resource - case "http" | "https" | "ftp" => + case e if shouldDownload(e) => if (deployMode == CLIENT) { + // In client mode, we already download the resources, so figuring out the local one + // should be enough. val fileName = new Path(uri).getName new File(targetDir, fileName).toURI.toString } else { @@ -399,18 +400,18 @@ object SparkSubmit extends CommandLineUtils with Logging { } } - args.primaryResource = Option(args.primaryResource).map { downloadHttpResource }.orNull + args.primaryResource = Option(args.primaryResource).map { downloadResource }.orNull args.files = Option(args.files).map { files => - files.split(",").map(_.trim).filter(_.nonEmpty).map { downloadHttpResource }.mkString(",") + files.split(",").map(_.trim).filter(_.nonEmpty).map { downloadResource }.mkString(",") }.orNull args.pyFiles = Option(args.pyFiles).map { files => - files.split(",").map(_.trim).filter(_.nonEmpty).map { downloadHttpResource }.mkString(",") + files.split(",").map(_.trim).filter(_.nonEmpty).map { downloadResource }.mkString(",") }.orNull args.jars = Option(args.jars).map { files => - files.split(",").map(_.trim).filter(_.nonEmpty).map { downloadHttpResource }.mkString(",") + files.split(",").map(_.trim).filter(_.nonEmpty).map { downloadResource }.mkString(",") }.orNull args.archives = Option(args.archives).map { files => - files.split(",").map(_.trim).filter(_.nonEmpty).map { downloadHttpResource }.mkString(",") + files.split(",").map(_.trim).filter(_.nonEmpty).map { downloadResource }.mkString(",") }.orNull } diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index af20684f798c5..d3b7a5ee6e5d2 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -401,11 +401,11 @@ package object config { .doubleConf .createWithDefault(1.5) - private[spark] val FORCE_DOWNLOAD_RESOURCES = - ConfigBuilder("spark.yarn.dist.forceDownloadResources") - .doc("Whether to download remote HTTP(s) resources to local and upload to Hadoop FS. " + - "This is only honored in Hadoop 2.9+ environment to bypass the build-in HTTP(s) " + - "FileSystem and use Spark's own logic to handle remote HTTP(s) resources") - .booleanConf - .createWithDefault(false) + private[spark] val FORCE_DOWNLOAD_SCHEMES = + ConfigBuilder("spark.yarn.dist.forceDownloadSchemes") + .doc("Comma-separated list of schemes in which remote resources have to download to local " + + "disk and upload to Hadoop FS.") + .stringConf + .toSequence + .createWithDefault(Nil) } diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index 6d8479bcd4ad2..bf777aa136909 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -954,7 +954,7 @@ class SparkSubmitSuite val args2 = Seq( "--class", UserClasspathFirstTest.getClass.getName.stripPrefix("$"), - "--conf", "spark.yarn.dist.forceDownloadResources=true", + "--conf", "spark.yarn.dist.forceDownloadSchemes=http,https", "--name", "testApp", "--master", "yarn", "--deploy-mode", "client", diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index 36a10505730f3..b252d08d2eb59 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -212,12 +212,11 @@ To use a custom metrics.properties for the application master and executors, upd - spark.yarn.dist.forceDownloadResources - false + spark.yarn.dist.forceDownloadSchemes + (none) - Whether to download remote HTTP(s) resources to local and upload to Hadoop FS. This is only - honored in Hadoop 2.9+ environment to bypass the build-in HTTP(s) FileSystem and use Spark's - own logic to handle remote HTTP(s) resources" + Comma-separated schemes in which remote resources have to download to local disk and upload + to Hadoop FS. From 95b48ea0c54f21e04cb2e640ea5e04edfd5fbe15 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Thu, 14 Sep 2017 16:23:00 +0800 Subject: [PATCH 04/10] Address the comments --- .../apache/spark/deploy/DependencyUtils.scala | 5 + .../org/apache/spark/deploy/SparkSubmit.scala | 25 ++-- .../spark/deploy/SparkSubmitSuite.scala | 112 +++++++++--------- docs/running-on-yarn.md | 5 +- 4 files changed, 75 insertions(+), 72 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala b/core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala index 51c3d9b158cbe..7d4d8b7b29088 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala @@ -121,6 +121,11 @@ private[deploy] object DependencyUtils { uri.getScheme match { case "file" | "local" => path + case "http" | "https" | "ftp" if Utils.isTesting => + // This is only used for SparkSubmitSuite unit test. Instead of downloading file remotely, + // return a dummy local path instead. + val file = new File(uri.getPath) + new File(targetDir, file.getName).toURI.toString case _ => val fname = new Path(uri).getName() val localFile = Utils.doFetchFile(uri.toString(), targetDir, fname, sparkConf, secMgr, diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 1f95a5322575e..e66b98d85f951 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -368,19 +368,22 @@ object SparkSubmit extends CommandLineUtils with Logging { }.orNull } - // When running in YARN cluster manager, + // When running in YARN, for some remote resources with scheme: + // 1. Hadoop FileSystem doesn't support them. + // 2. We explicitly bypass Hadoop FileSystem with "spark.yarn.dist.forceDownloadSchemes". + // We will download them to local disk prior to add to YARN's distributed cache. + // For yarn client mode, since we already download them with above code, so we only need to + // gifure out the local path to replace the remote one. if (clusterManager == YARN) { sparkConf.setIfMissing(SecurityManager.SPARK_AUTH_SECRET_CONF, "unused") val secMgr = new SecurityManager(sparkConf) val forceDownloadSchemes = sparkConf.get(FORCE_DOWNLOAD_SCHEMES) - // Check the scheme list provided by "spark.yarn.dist.forceDownloadSchemes" to see if current - // resource's scheme is included in this list, or Hadoop FileSystem doesn't support current - // scheme, if so Spark will download the resources to local disk and upload to Hadoop FS. def shouldDownload(scheme: String): Boolean = { - val isFsAvailable = Try { FileSystem.getFileSystemClass(scheme, hadoopConf) } - .map(_ => true).getOrElse(false) - forceDownloadSchemes.contains(scheme) || !isFsAvailable + val isFsAvailable = () => { + Try { FileSystem.getFileSystemClass(scheme, hadoopConf) }.isSuccess + } + forceDownloadSchemes.contains(scheme) || !isFsAvailable() } def downloadResource(resource: String): String = { @@ -388,11 +391,9 @@ object SparkSubmit extends CommandLineUtils with Logging { uri.getScheme match { case "local" | "file" => resource case e if shouldDownload(e) => - if (deployMode == CLIENT) { - // In client mode, we already download the resources, so figuring out the local one - // should be enough. - val fileName = new Path(uri).getName - new File(targetDir, fileName).toURI.toString + val file = new File(targetDir, new Path(uri).getName) + if (file.exists()) { + file.toURI.toString } else { downloadFile(resource, targetDir, sparkConf, hadoopConf, secMgr) } diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index bf777aa136909..f30243cd26c39 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -897,78 +897,74 @@ class SparkSubmitSuite sysProps("spark.submit.pyFiles") should (startWith("/")) } - test("handle remote http(s) resources in yarn mode") { + test("download remote resource if it is not supported by yarn service") { + testRemoteResources(isHttpSchemeBlacklisted = false, supportMockHttpFs = false) + } + + test("avoid downloading remote resource if it is supported by yarn service") { + testRemoteResources(isHttpSchemeBlacklisted = false, supportMockHttpFs = true) + } + + test("force downloading remote resource if it's scheme is configured") { + testRemoteResources(isHttpSchemeBlacklisted = true, supportMockHttpFs = true) + } + + private def testRemoteResources( + isHttpSchemeBlacklisted: Boolean, supportMockHttpFs: Boolean): Unit = { val hadoopConf = new Configuration() updateConfWithFakeS3Fs(hadoopConf) + if (supportMockHttpFs) { + hadoopConf.set("fs.http.impl", classOf[TestFileSystem].getCanonicalName) + hadoopConf.set("fs.http.impl.disable.cache", "true") + } val tmpDir = Utils.createTempDir() val mainResource = File.createTempFile("tmpPy", ".py", tmpDir) - val tmpJar = TestUtils.createJarWithFiles(Map("test.resource" -> "USER"), tmpDir) - val tmpJarPath = s"s3a://${new File(tmpJar.toURI).getAbsolutePath}" - // This assumes UT environment could access external network. - val remoteHttpJar = - "http://central.maven.org/maven2/io/dropwizard/metrics/metrics-core/" + - "3.2.4/metrics-core-3.2.4.jar" + val tmpS3Jar = TestUtils.createJarWithFiles(Map("test.resource" -> "USER"), tmpDir) + val tmpS3JarPath = s"s3a://${new File(tmpS3Jar.toURI).getAbsolutePath}" + val tmpHttpJar = TestUtils.createJarWithFiles(Map("test.resource" -> "USER"), tmpDir) + val tmpHttpJarPath = s"http://${new File(tmpHttpJar.toURI).getAbsolutePath}" val args = Seq( "--class", UserClasspathFirstTest.getClass.getName.stripPrefix("$"), "--name", "testApp", "--master", "yarn", "--deploy-mode", "client", - "--jars", s"$tmpJarPath,$remoteHttpJar", - s"s3a://$mainResource" - ) - - val appArgs = new SparkSubmitArguments(args) - val sysProps = SparkSubmit.prepareSubmitEnvironment(appArgs, Some(hadoopConf))._3 - - // Resources in S3 should still be remote path, but remote http resource will be downloaded - // to local. - val jars = sysProps("spark.yarn.dist.jars").split(",").toSet - jars.contains(tmpJarPath) should be (true) - val metricsJar = jars.filter(_.contains("metrics")) - metricsJar.size should be (1) - metricsJar.head should startWith("file:") - - val hadoopConf1 = new Configuration() - updateConfWithFakeS3Fs(hadoopConf1) - hadoopConf1.set("fs.http.impl", classOf[TestFileSystem].getCanonicalName) - hadoopConf1.set("fs.http.impl.disable.cache", "true") - - val args1 = Seq( - "--class", UserClasspathFirstTest.getClass.getName.stripPrefix("$"), - "--name", "testApp", - "--master", "yarn", - "--deploy-mode", "client", - "--jars", s"$tmpJarPath,$remoteHttpJar", - s"s3a://$mainResource" - ) - - val appArgs1 = new SparkSubmitArguments(args1) - val sysProps1 = SparkSubmit.prepareSubmitEnvironment(appArgs1, Some(hadoopConf1))._3 - - // If Hadoop FileSystem supports remote HTTP(S) resoures, we will still keep HTTP - // resource URI, not the local one. - val jars1 = sysProps1("spark.yarn.dist.jars").split(",").toSet - jars1.contains(remoteHttpJar) should be (true) - - val args2 = Seq( - "--class", UserClasspathFirstTest.getClass.getName.stripPrefix("$"), - "--conf", "spark.yarn.dist.forceDownloadSchemes=http,https", - "--name", "testApp", - "--master", "yarn", - "--deploy-mode", "client", - "--jars", s"$tmpJarPath,$remoteHttpJar", + "--jars", s"$tmpS3JarPath,$tmpHttpJarPath", s"s3a://$mainResource" + ) ++ ( + if (isHttpSchemeBlacklisted) { + Seq("--conf", "spark.yarn.dist.forceDownloadSchemes=http,https") + } else { + Nil + } ) - val appArgs2 = new SparkSubmitArguments(args2) - val sysProps2 = SparkSubmit.prepareSubmitEnvironment(appArgs2, Some(hadoopConf1))._3 - - // We explicitly disable using HTTP(S) FileSystem, force downloading remote HTTP(s) - // resources, so the URI should be local one. - val jars2 = sysProps2("spark.yarn.dist.jars").split(",").toSet - jars.filter(_.contains("metrics")).head should startWith("file:") + sys.props.put("spark.testing", "1") + try { + val appArgs = new SparkSubmitArguments(args) + val sysProps = SparkSubmit.prepareSubmitEnvironment(appArgs, Some(hadoopConf))._3 + + val jars = sysProps("spark.yarn.dist.jars").split(",").toSet + + // The URI of remote S3 resource should still be remote. + assert(jars.contains(tmpS3JarPath)) + + if (supportMockHttpFs) { + // If Http FS is supported by yarn service, the URI of remote http resource should + // still be remote. + assert(jars.contains(tmpHttpJarPath)) + } else { + // If Http FS is not supported by yarn service, or http scheme is configured to be force + // downloading, the URI of remote http resource should be changed to a local one. + val jarName = new File(tmpHttpJar.toURI).getName + val localHttpJar = jars.filter(_.contains(jarName)) + localHttpJar.size should be(1) + localHttpJar.head should startWith("file:") + } + } finally { + sys.props.remove("spark.testing") + } } // NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly. diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index b252d08d2eb59..98781082cf074 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -215,8 +215,9 @@ To use a custom metrics.properties for the application master and executors, upd spark.yarn.dist.forceDownloadSchemes (none) - Comma-separated schemes in which remote resources have to download to local disk and upload - to Hadoop FS. + Comma-separated list of schemes for which files will be downloaded to the local disk prior to + being added to YARN's distributed cache. For use in cases where the YARN service does not + support schemes that are supported by Spark. From 19b7fbf865f3df63006e6ec34167735670830119 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Thu, 14 Sep 2017 16:32:36 +0800 Subject: [PATCH 05/10] minor changes --- .../src/main/scala/org/apache/spark/deploy/SparkSubmit.scala | 2 +- .../scala/org/apache/spark/internal/config/package.scala | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index e66b98d85f951..77161c6e679da 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -373,7 +373,7 @@ object SparkSubmit extends CommandLineUtils with Logging { // 2. We explicitly bypass Hadoop FileSystem with "spark.yarn.dist.forceDownloadSchemes". // We will download them to local disk prior to add to YARN's distributed cache. // For yarn client mode, since we already download them with above code, so we only need to - // gifure out the local path to replace the remote one. + // figure out the local path and replace the remote one. if (clusterManager == YARN) { sparkConf.setIfMissing(SecurityManager.SPARK_AUTH_SECRET_CONF, "unused") val secMgr = new SecurityManager(sparkConf) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index d3b7a5ee6e5d2..0fe4dfedd4f68 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -403,8 +403,9 @@ package object config { private[spark] val FORCE_DOWNLOAD_SCHEMES = ConfigBuilder("spark.yarn.dist.forceDownloadSchemes") - .doc("Comma-separated list of schemes in which remote resources have to download to local " + - "disk and upload to Hadoop FS.") + .doc("Comma-separated list of schemes for which files will be downloaded to the " + + "local disk prior to being added to YARN's distributed cache. For use in cases " + + "where the YARN service does not support schemes that are supported by Spark.") .stringConf .toSequence .createWithDefault(Nil) From a01198240913f43913422a8812eb91a8eae18ae7 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Fri, 15 Sep 2017 10:36:12 +0800 Subject: [PATCH 06/10] Further address some minor issues --- .../apache/spark/deploy/DependencyUtils.scala | 4 +- .../org/apache/spark/deploy/SparkSubmit.scala | 14 +++--- .../scala/org/apache/spark/util/Utils.scala | 3 ++ .../spark/deploy/SparkSubmitSuite.scala | 43 ++++++++----------- 4 files changed, 31 insertions(+), 33 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala b/core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala index 7d4d8b7b29088..ecc82d7ac8001 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala @@ -94,7 +94,7 @@ private[deploy] object DependencyUtils { hadoopConf: Configuration, secMgr: SecurityManager): String = { require(fileList != null, "fileList cannot be null.") - fileList.split(",") + Utils.stringToSeq(fileList) .map(downloadFile(_, targetDir, sparkConf, hadoopConf, secMgr)) .mkString(",") } @@ -136,7 +136,7 @@ private[deploy] object DependencyUtils { def resolveGlobPaths(paths: String, hadoopConf: Configuration): String = { require(paths != null, "paths cannot be null.") - paths.split(",").map(_.trim).filter(_.nonEmpty).flatMap { path => + Utils.stringToSeq(paths).flatMap { path => val uri = Utils.resolveURI(path) uri.getScheme match { case "local" | "http" | "https" | "ftp" => Array(path) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 77161c6e679da..e60027658472d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -403,16 +403,16 @@ object SparkSubmit extends CommandLineUtils with Logging { args.primaryResource = Option(args.primaryResource).map { downloadResource }.orNull args.files = Option(args.files).map { files => - files.split(",").map(_.trim).filter(_.nonEmpty).map { downloadResource }.mkString(",") + Utils.stringToSeq(files).map { downloadResource }.mkString(",") }.orNull - args.pyFiles = Option(args.pyFiles).map { files => - files.split(",").map(_.trim).filter(_.nonEmpty).map { downloadResource }.mkString(",") + args.pyFiles = Option(args.pyFiles).map { pyFiles => + Utils.stringToSeq(pyFiles).map { downloadResource }.mkString(",") }.orNull - args.jars = Option(args.jars).map { files => - files.split(",").map(_.trim).filter(_.nonEmpty).map { downloadResource }.mkString(",") + args.jars = Option(args.jars).map { jars => + Utils.stringToSeq(jars).map { downloadResource }.mkString(",") }.orNull - args.archives = Option(args.archives).map { files => - files.split(",").map(_.trim).filter(_.nonEmpty).map { downloadResource }.mkString(",") + args.archives = Option(args.archives).map { archives => + Utils.stringToSeq(archives).map { downloadResource }.mkString(",") }.orNull } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index bc08808a4d292..836e33c36d9a1 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2684,6 +2684,9 @@ private[spark] object Utils extends Logging { redact(redactionPattern, kvs.toArray) } + def stringToSeq(str: String): Seq[String] = { + str.split(",").map(_.trim()).filter(_.nonEmpty) + } } private[util] object CallerContext extends Logging { diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index f30243cd26c39..ad801bf8519a6 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -905,12 +905,12 @@ class SparkSubmitSuite testRemoteResources(isHttpSchemeBlacklisted = false, supportMockHttpFs = true) } - test("force downloading remote resource if it's scheme is configured") { + test("force download from blacklisted schemes") { testRemoteResources(isHttpSchemeBlacklisted = true, supportMockHttpFs = true) } - private def testRemoteResources( - isHttpSchemeBlacklisted: Boolean, supportMockHttpFs: Boolean): Unit = { + private def testRemoteResources(isHttpSchemeBlacklisted: Boolean, + supportMockHttpFs: Boolean): Unit = { val hadoopConf = new Configuration() updateConfWithFakeS3Fs(hadoopConf) if (supportMockHttpFs) { @@ -940,30 +940,25 @@ class SparkSubmitSuite } ) - sys.props.put("spark.testing", "1") - try { - val appArgs = new SparkSubmitArguments(args) - val sysProps = SparkSubmit.prepareSubmitEnvironment(appArgs, Some(hadoopConf))._3 + val appArgs = new SparkSubmitArguments(args) + val sysProps = SparkSubmit.prepareSubmitEnvironment(appArgs, Some(hadoopConf))._3 - val jars = sysProps("spark.yarn.dist.jars").split(",").toSet + val jars = sysProps("spark.yarn.dist.jars").split(",").toSet - // The URI of remote S3 resource should still be remote. - assert(jars.contains(tmpS3JarPath)) + // The URI of remote S3 resource should still be remote. + assert(jars.contains(tmpS3JarPath)) - if (supportMockHttpFs) { - // If Http FS is supported by yarn service, the URI of remote http resource should - // still be remote. - assert(jars.contains(tmpHttpJarPath)) - } else { - // If Http FS is not supported by yarn service, or http scheme is configured to be force - // downloading, the URI of remote http resource should be changed to a local one. - val jarName = new File(tmpHttpJar.toURI).getName - val localHttpJar = jars.filter(_.contains(jarName)) - localHttpJar.size should be(1) - localHttpJar.head should startWith("file:") - } - } finally { - sys.props.remove("spark.testing") + if (supportMockHttpFs) { + // If Http FS is supported by yarn service, the URI of remote http resource should + // still be remote. + assert(jars.contains(tmpHttpJarPath)) + } else { + // If Http FS is not supported by yarn service, or http scheme is configured to be force + // downloading, the URI of remote http resource should be changed to a local one. + val jarName = new File(tmpHttpJar.toURI).getName + val localHttpJar = jars.filter(_.contains(jarName)) + localHttpJar.size should be(1) + localHttpJar.head should startWith("file:") } } From 9538caeb31828c12d5940e72354bb042f03b11f7 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Fri, 15 Sep 2017 10:39:09 +0800 Subject: [PATCH 07/10] Style changes --- .../main/scala/org/apache/spark/deploy/SparkSubmit.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index e60027658472d..d8dfa50d85241 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -403,16 +403,16 @@ object SparkSubmit extends CommandLineUtils with Logging { args.primaryResource = Option(args.primaryResource).map { downloadResource }.orNull args.files = Option(args.files).map { files => - Utils.stringToSeq(files).map { downloadResource }.mkString(",") + Utils.stringToSeq(files).map(downloadResource).mkString(",") }.orNull args.pyFiles = Option(args.pyFiles).map { pyFiles => - Utils.stringToSeq(pyFiles).map { downloadResource }.mkString(",") + Utils.stringToSeq(pyFiles).map(downloadResource).mkString(",") }.orNull args.jars = Option(args.jars).map { jars => - Utils.stringToSeq(jars).map { downloadResource }.mkString(",") + Utils.stringToSeq(jars).map(downloadResource).mkString(",") }.orNull args.archives = Option(args.archives).map { archives => - Utils.stringToSeq(archives).map { downloadResource }.mkString(",") + Utils.stringToSeq(archives).map(downloadResource).mkString(",") }.orNull } From 915de61dd8d2e406410c65eaed00c33a3f22ca72 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Tue, 19 Sep 2017 10:25:03 +0800 Subject: [PATCH 08/10] Minor changes --- core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala | 4 +--- .../main/scala/org/apache/spark/internal/config/package.scala | 3 ++- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index d8dfa50d85241..49b4d5560a2da 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -380,10 +380,8 @@ object SparkSubmit extends CommandLineUtils with Logging { val forceDownloadSchemes = sparkConf.get(FORCE_DOWNLOAD_SCHEMES) def shouldDownload(scheme: String): Boolean = { - val isFsAvailable = () => { + forceDownloadSchemes.contains(scheme) || Try { FileSystem.getFileSystemClass(scheme, hadoopConf) }.isSuccess - } - forceDownloadSchemes.contains(scheme) || !isFsAvailable() } def downloadResource(resource: String): String = { diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 0fe4dfedd4f68..4cd12662f7df5 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -405,7 +405,8 @@ package object config { ConfigBuilder("spark.yarn.dist.forceDownloadSchemes") .doc("Comma-separated list of schemes for which files will be downloaded to the " + "local disk prior to being added to YARN's distributed cache. For use in cases " + - "where the YARN service does not support schemes that are supported by Spark.") + "where the YARN service does not support schemes that are supported by Spark, like http, " + + "https, ftp.") .stringConf .toSequence .createWithDefault(Nil) From 3bd487f739ae4f9dcc89cccdd727f334a8fe06a5 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Tue, 19 Sep 2017 13:37:53 +0800 Subject: [PATCH 09/10] Fix test failure --- core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 49b4d5560a2da..286a4379d2040 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -381,7 +381,7 @@ object SparkSubmit extends CommandLineUtils with Logging { def shouldDownload(scheme: String): Boolean = { forceDownloadSchemes.contains(scheme) || - Try { FileSystem.getFileSystemClass(scheme, hadoopConf) }.isSuccess + Try { FileSystem.getFileSystemClass(scheme, hadoopConf) }.isFailure } def downloadResource(resource: String): String = { From 0fb79432f5483f3fb83360d634dd0a5aacade157 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Tue, 19 Sep 2017 15:11:42 +0800 Subject: [PATCH 10/10] Fix one left doc issue --- .../main/scala/org/apache/spark/internal/config/package.scala | 2 +- docs/running-on-yarn.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 4cd12662f7df5..44a2815b81a73 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -406,7 +406,7 @@ package object config { .doc("Comma-separated list of schemes for which files will be downloaded to the " + "local disk prior to being added to YARN's distributed cache. For use in cases " + "where the YARN service does not support schemes that are supported by Spark, like http, " + - "https, ftp.") + "https and ftp.") .stringConf .toSequence .createWithDefault(Nil) diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index 98781082cf074..432639588cc2b 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -217,7 +217,7 @@ To use a custom metrics.properties for the application master and executors, upd Comma-separated list of schemes for which files will be downloaded to the local disk prior to being added to YARN's distributed cache. For use in cases where the YARN service does not - support schemes that are supported by Spark. + support schemes that are supported by Spark, like http, https and ftp.