From 522e3e85143235a57c94cdc618133e66715264de Mon Sep 17 00:00:00 2001 From: Jeff Zhang Date: Sun, 18 Sep 2016 09:11:06 +0800 Subject: [PATCH] [SPARK-17605][SPARK_SUBMIT] Add option spark.usePython and spark.useR for applications that use both pyspark and sparkr --- .../org/apache/spark/deploy/SparkSubmit.scala | 28 +++++++++++-------- .../spark/deploy/SparkSubmitArguments.scala | 4 +++ 2 files changed, 20 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 80611658a164..d7f821d47fb1 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -295,27 +295,31 @@ object SparkSubmit { Option(args.repositories), Option(args.ivyRepoPath), exclusions = exclusions) if (!StringUtils.isBlank(resolvedMavenCoordinates)) { args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates) - if (args.isPython) { + if (args.usePython) { args.pyFiles = mergeFileLists(args.pyFiles, resolvedMavenCoordinates) } } // install any R packages that may have been passed through --jars or --packages. // Spark Packages may contain R source code inside the jar. - if (args.isR && !StringUtils.isBlank(args.jars)) { + if (args.useR && !StringUtils.isBlank(args.jars)) { RPackageUtils.checkAndBuildRPackage(args.jars, printStream, args.verbose) } // Require all python files to be local, so we can add them to the PYTHONPATH // In YARN cluster mode, python files are distributed as regular files, which can be non-local. // In Mesos cluster mode, non-local python files are automatically downloaded by Mesos. - if (args.isPython && !isYarnCluster && !isMesosCluster) { - if (Utils.nonLocalPaths(args.primaryResource).nonEmpty) { - printErrorAndExit(s"Only local python files are supported: ${args.primaryResource}") + if (!isYarnCluster && !isMesosCluster) { + if (args.isPython) { + if (Utils.nonLocalPaths(args.primaryResource).nonEmpty) { + printErrorAndExit(s"Only local python files are supported: ${args.primaryResource}") + } } - val nonLocalPyFiles = Utils.nonLocalPaths(args.pyFiles).mkString(",") - if (nonLocalPyFiles.nonEmpty) { - printErrorAndExit(s"Only local additional python files are supported: $nonLocalPyFiles") + if (args.usePython) { + val nonLocalPyFiles = Utils.nonLocalPaths(args.pyFiles).mkString(",") + if (nonLocalPyFiles.nonEmpty) { + printErrorAndExit(s"Only local additional python files are supported: $nonLocalPyFiles") + } } } @@ -374,7 +378,7 @@ object SparkSubmit { // In YARN mode for an R app, add the SparkR package archive and the R package // archive containing all of the built R libraries to archives so that they can // be distributed with the job - if (args.isR && clusterManager == YARN) { + if (args.useR && clusterManager == YARN) { val sparkRPackagePath = RUtils.localSparkRPackagePath if (sparkRPackagePath.isEmpty) { printErrorAndExit("SPARK_HOME does not exist for R application in YARN mode.") @@ -404,7 +408,7 @@ object SparkSubmit { } // TODO: Support distributing R packages with standalone cluster - if (args.isR && clusterManager == STANDALONE && !RUtils.rPackages.isEmpty) { + if (args.useR && clusterManager == STANDALONE && !RUtils.rPackages.isEmpty) { printErrorAndExit("Distributing R packages with standalone cluster is not supported.") } @@ -538,7 +542,7 @@ object SparkSubmit { // Let YARN know it's a pyspark app, so it distributes needed libraries. if (clusterManager == YARN) { - if (args.isPython) { + if (args.usePython) { sysProps.put("spark.yarn.isPython", "true") } @@ -590,7 +594,7 @@ object SparkSubmit { if (isMesosCluster) { assert(args.useRest, "Mesos cluster mode is only supported through the REST submission API") childMainClass = "org.apache.spark.deploy.rest.RestSubmissionClient" - if (args.isPython) { + if (args.usePython) { // Second argument is main class childArgs += (args.primaryResource, "") if (args.pyFiles != null) { diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index f1761e7c1ec9..4e2cf5de3ca1 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -70,6 +70,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S var proxyUser: String = null var principal: String = null var keytab: String = null + var usePython: Boolean = false + var useR: Boolean = false // Standalone cluster mode only var supervise: Boolean = false @@ -186,6 +188,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S .getOrElse(sparkProperties.get("spark.executor.instances").orNull) keytab = Option(keytab).orElse(sparkProperties.get("spark.yarn.keytab")).orNull principal = Option(principal).orElse(sparkProperties.get("spark.yarn.principal")).orNull + usePython = isPython || sparkProperties.getOrElse("spark.usePython", "false").toBoolean + useR = isR || sparkProperties.getOrElse("spark.useR", "false").toBoolean // Try to set main class from JAR if no --class argument is given if (mainClass == null && !isPython && !isR && primaryResource != null) {