Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
a7edf6c
Support mounting hostPath volumes for executors
Apr 5, 2018
52f0d1c
Read mode for mounted volumes
Apr 5, 2018
70d050e
Refactor
Apr 5, 2018
463e9b5
Fix style
Apr 6, 2018
0a21a19
Add unit tests
Apr 6, 2018
87bcc9f
Update comment
Apr 6, 2018
c72cbc1
Fix unit tests
Apr 6, 2018
4f1e8b9
[SPARK-23871][ML][PYTHON] add python api for VectorAssembler handleIn…
huaxingao Apr 10, 2018
7c7570d
[SPARK-23944][ML] Add the set method for the two LSHModel
lu-wang-dl Apr 11, 2018
c7622be
[SPARK-23847][FOLLOWUP][PYTHON][SQL] Actually test [desc|acs]_nulls_[…
HyukjinKwon Apr 11, 2018
87611bb
[MINOR][DOCS] Fix R documentation generation instruction for roxygen2
HyukjinKwon Apr 11, 2018
c604d65
[SPARK-23951][SQL] Use actual java class instead of string representa…
hvanhovell Apr 11, 2018
271c891
[SPARK-23960][SQL][MINOR] Mark HashAggregateExec.bufVars as transient
rednaxelafx Apr 11, 2018
653fe02
[SPARK-6951][CORE] Speed up parsing of event logs during listing.
Apr 11, 2018
3cb8204
[SPARK-22941][CORE] Do not exit JVM when submit fails with in-process…
Apr 11, 2018
75a1830
[SPARK-22883] ML test for StructuredStreaming: spark.ml.feature, I-M
jkbradley Apr 11, 2018
9d960de
typo rawPredicition changed to rawPrediction
JBauerKogentix Apr 11, 2018
e904dfa
Revert "[SPARK-23960][SQL][MINOR] Mark HashAggregateExec.bufVars as t…
gatorsmile Apr 12, 2018
6a2289e
[SPARK-23962][SQL][TEST] Fix race in currentExecutionIds().
squito Apr 12, 2018
0b19122
[SPARK-23762][SQL] UTF8StringBuffer uses MemoryBlock
kiszk Apr 12, 2018
0f93b91
[SPARK-23751][FOLLOW-UP] fix build for scala-2.12
WeichenXu123 Apr 12, 2018
682002b
[SPARK-23867][SCHEDULER] use droppedCount in logWarning
Apr 13, 2018
14291b0
[SPARK-23748][SS] Fix SS continuous process doesn't support SubqueryA…
jerryshao Apr 13, 2018
ab7b961
[SPARK-23942][PYTHON][SQL] Makes collect in PySpark as action for a q…
HyukjinKwon Apr 13, 2018
1018be4
[SPARK-23971] Should not leak Spark sessions across test suites
ericl Apr 13, 2018
4b07036
[SPARK-23815][CORE] Spark writer dynamic partition overwrite mode may…
Apr 13, 2018
0323e61
[SPARK-23905][SQL] Add UDF weekday
yucai Apr 13, 2018
a83ae0d
[SPARK-22839][K8S] Refactor to unify driver and executor pod builder …
mccheah Apr 13, 2018
4dfd746
[SPARK-23896][SQL] Improve PartitioningAwareFileIndex
gengliangwang Apr 13, 2018
25892f3
[SPARK-23375][SQL] Eliminate unneeded Sort in Optimizer
mgaido91 Apr 13, 2018
558f31b
[SPARK-23963][SQL] Properly handle large number of columns in query o…
bersprockets Apr 13, 2018
cbb41a0
[SPARK-23966][SS] Refactoring all checkpoint file writing logic in a …
tdas Apr 13, 2018
73f2853
[SPARK-23979][SQL] MultiAlias should not be a CodegenFallback
viirya Apr 14, 2018
c096493
[SPARK-23956][YARN] Use effective RPC port in AM registration
gerashegalov Apr 16, 2018
6931022
[SPARK-23917][SQL] Add array_max function
mgaido91 Apr 16, 2018
083cf22
[SPARK-21033][CORE][FOLLOW-UP] Update Spillable
wangyum Apr 16, 2018
5003736
[SPARK-9312][ML] Add RawPrediction, numClasses, and numFeatures for O…
lu-wang-dl Apr 16, 2018
0461482
[SPARK-21088][ML] CrossValidator, TrainValidationSplit support collec…
WeichenXu123 Apr 16, 2018
c9bb4e1
Support mounting hostPath volumes for executors
Apr 5, 2018
1cdd9c8
Read mode for mounted volumes
Apr 5, 2018
ac25098
Refactor
Apr 5, 2018
a56e80f
Fix style
Apr 6, 2018
674a4df
Add unit tests
Apr 6, 2018
49c5b01
Update comment
Apr 6, 2018
79c30c4
Fix unit tests
Apr 6, 2018
86afec3
Merge
Apr 16, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 28 additions & 2 deletions core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.internal.Logging
import org.apache.spark.util.{MutableURLClassLoader, Utils}

private[deploy] object DependencyUtils {
private[deploy] object DependencyUtils extends Logging {

def resolveMavenDependencies(
packagesExclusions: String,
Expand Down Expand Up @@ -75,7 +76,7 @@ private[deploy] object DependencyUtils {
def addJarsToClassPath(jars: String, loader: MutableURLClassLoader): Unit = {
if (jars != null) {
for (jar <- jars.split(",")) {
SparkSubmit.addJarToClasspath(jar, loader)
addJarToClasspath(jar, loader)
}
}
}
Expand Down Expand Up @@ -151,6 +152,31 @@ private[deploy] object DependencyUtils {
}.mkString(",")
}

def addJarToClasspath(localJar: String, loader: MutableURLClassLoader): Unit = {
val uri = Utils.resolveURI(localJar)
uri.getScheme match {
case "file" | "local" =>
val file = new File(uri.getPath)
if (file.exists()) {
loader.addURL(file.toURI.toURL)
} else {
logWarning(s"Local jar $file does not exist, skipping.")
}
case _ =>
logWarning(s"Skip remote jar $uri.")
}
}

/**
* Merge a sequence of comma-separated file lists, some of which may be null to indicate
* no files, into a single comma-separated string.
*/
def mergeFileLists(lists: String*): String = {
val merged = lists.filterNot(StringUtils.isBlank)
.flatMap(Utils.stringToSeq)
if (merged.nonEmpty) merged.mkString(",") else null
}

private def splitOnFragment(path: String): (URI, Option[String]) = {
val uri = Utils.resolveURI(path)
val withoutFragment = new URI(uri.getScheme, uri.getSchemeSpecificPart, null)
Expand Down
318 changes: 161 additions & 157 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.io.Source
import scala.util.Try

import org.apache.spark.{SparkException, SparkUserAppException}
import org.apache.spark.deploy.SparkSubmitAction._
import org.apache.spark.internal.Logging
import org.apache.spark.launcher.SparkSubmitArgumentsParser
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.util.Utils
Expand All @@ -40,7 +42,7 @@ import org.apache.spark.util.Utils
* The env argument is used for testing.
*/
private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, String] = sys.env)
extends SparkSubmitArgumentsParser {
extends SparkSubmitArgumentsParser with Logging {
var master: String = null
var deployMode: String = null
var executorMemory: String = null
Expand Down Expand Up @@ -85,8 +87,9 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
/** Default properties present in the currently defined defaults file. */
lazy val defaultSparkProperties: HashMap[String, String] = {
val defaultProperties = new HashMap[String, String]()
// scalastyle:off println
if (verbose) SparkSubmit.printStream.println(s"Using properties file: $propertiesFile")
if (verbose) {
logInfo(s"Using properties file: $propertiesFile")
}
Option(propertiesFile).foreach { filename =>
val properties = Utils.getPropertiesFromFile(filename)
properties.foreach { case (k, v) =>
Expand All @@ -95,21 +98,16 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
// Property files may contain sensitive information, so redact before printing
if (verbose) {
Utils.redact(properties).foreach { case (k, v) =>
SparkSubmit.printStream.println(s"Adding default property: $k=$v")
logInfo(s"Adding default property: $k=$v")
}
}
}
// scalastyle:on println
defaultProperties
}

// Set parameters from command line arguments
try {
parse(args.asJava)
} catch {
case e: IllegalArgumentException =>
SparkSubmit.printErrorAndExit(e.getMessage())
}
parse(args.asJava)

// Populate `sparkProperties` map from properties file
mergeDefaultSparkProperties()
// Remove keys that don't start with "spark." from `sparkProperties`.
Expand Down Expand Up @@ -141,7 +139,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
sparkProperties.foreach { case (k, v) =>
if (!k.startsWith("spark.")) {
sparkProperties -= k
SparkSubmit.printWarning(s"Ignoring non-spark config property: $k=$v")
logWarning(s"Ignoring non-spark config property: $k=$v")
}
}
}
Expand Down Expand Up @@ -215,10 +213,10 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
}
} catch {
case _: Exception =>
SparkSubmit.printErrorAndExit(s"Cannot load main class from JAR $primaryResource")
error(s"Cannot load main class from JAR $primaryResource")
}
case _ =>
SparkSubmit.printErrorAndExit(
error(
s"Cannot load main class from JAR $primaryResource with URI $uriScheme. " +
"Please specify a class through --class.")
}
Expand Down Expand Up @@ -248,6 +246,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
case SUBMIT => validateSubmitArguments()
case KILL => validateKillArguments()
case REQUEST_STATUS => validateStatusRequestArguments()
case PRINT_VERSION =>
}
}

Expand All @@ -256,62 +255,61 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
printUsageAndExit(-1)
}
if (primaryResource == null) {
SparkSubmit.printErrorAndExit("Must specify a primary resource (JAR or Python or R file)")
error("Must specify a primary resource (JAR or Python or R file)")
}
if (mainClass == null && SparkSubmit.isUserJar(primaryResource)) {
SparkSubmit.printErrorAndExit("No main class set in JAR; please specify one with --class")
error("No main class set in JAR; please specify one with --class")
}
if (driverMemory != null
&& Try(JavaUtils.byteStringAsBytes(driverMemory)).getOrElse(-1L) <= 0) {
SparkSubmit.printErrorAndExit("Driver Memory must be a positive number")
error("Driver memory must be a positive number")
}
if (executorMemory != null
&& Try(JavaUtils.byteStringAsBytes(executorMemory)).getOrElse(-1L) <= 0) {
SparkSubmit.printErrorAndExit("Executor Memory cores must be a positive number")
error("Executor memory must be a positive number")
}
if (executorCores != null && Try(executorCores.toInt).getOrElse(-1) <= 0) {
SparkSubmit.printErrorAndExit("Executor cores must be a positive number")
error("Executor cores must be a positive number")
}
if (totalExecutorCores != null && Try(totalExecutorCores.toInt).getOrElse(-1) <= 0) {
SparkSubmit.printErrorAndExit("Total executor cores must be a positive number")
error("Total executor cores must be a positive number")
}
if (numExecutors != null && Try(numExecutors.toInt).getOrElse(-1) <= 0) {
SparkSubmit.printErrorAndExit("Number of executors must be a positive number")
error("Number of executors must be a positive number")
}
if (pyFiles != null && !isPython) {
SparkSubmit.printErrorAndExit("--py-files given but primary resource is not a Python script")
error("--py-files given but primary resource is not a Python script")
}

if (master.startsWith("yarn")) {
val hasHadoopEnv = env.contains("HADOOP_CONF_DIR") || env.contains("YARN_CONF_DIR")
if (!hasHadoopEnv && !Utils.isTesting) {
throw new Exception(s"When running with master '$master' " +
error(s"When running with master '$master' " +
"either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.")
}
}

if (proxyUser != null && principal != null) {
SparkSubmit.printErrorAndExit("Only one of --proxy-user or --principal can be provided.")
error("Only one of --proxy-user or --principal can be provided.")
}
}

private def validateKillArguments(): Unit = {
if (!master.startsWith("spark://") && !master.startsWith("mesos://")) {
SparkSubmit.printErrorAndExit(
"Killing submissions is only supported in standalone or Mesos mode!")
error("Killing submissions is only supported in standalone or Mesos mode!")
}
if (submissionToKill == null) {
SparkSubmit.printErrorAndExit("Please specify a submission to kill.")
error("Please specify a submission to kill.")
}
}

private def validateStatusRequestArguments(): Unit = {
if (!master.startsWith("spark://") && !master.startsWith("mesos://")) {
SparkSubmit.printErrorAndExit(
error(
"Requesting submission statuses is only supported in standalone or Mesos mode!")
}
if (submissionToRequestStatusFor == null) {
SparkSubmit.printErrorAndExit("Please specify a submission to request status for.")
error("Please specify a submission to request status for.")
}
}

Expand Down Expand Up @@ -368,7 +366,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S

case DEPLOY_MODE =>
if (value != "client" && value != "cluster") {
SparkSubmit.printErrorAndExit("--deploy-mode must be either \"client\" or \"cluster\"")
error("--deploy-mode must be either \"client\" or \"cluster\"")
}
deployMode = value

Expand Down Expand Up @@ -405,14 +403,14 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
case KILL_SUBMISSION =>
submissionToKill = value
if (action != null) {
SparkSubmit.printErrorAndExit(s"Action cannot be both $action and $KILL.")
error(s"Action cannot be both $action and $KILL.")
}
action = KILL

case STATUS =>
submissionToRequestStatusFor = value
if (action != null) {
SparkSubmit.printErrorAndExit(s"Action cannot be both $action and $REQUEST_STATUS.")
error(s"Action cannot be both $action and $REQUEST_STATUS.")
}
action = REQUEST_STATUS

Expand Down Expand Up @@ -444,7 +442,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
repositories = value

case CONF =>
val (confName, confValue) = SparkSubmit.parseSparkConfProperty(value)
val (confName, confValue) = SparkSubmitUtils.parseSparkConfProperty(value)
sparkProperties(confName) = confValue

case PROXY_USER =>
Expand All @@ -463,15 +461,15 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
verbose = true

case VERSION =>
SparkSubmit.printVersionAndExit()
action = SparkSubmitAction.PRINT_VERSION

case USAGE_ERROR =>
printUsageAndExit(1)

case _ =>
throw new IllegalArgumentException(s"Unexpected argument '$opt'.")
error(s"Unexpected argument '$opt'.")
}
true
action != SparkSubmitAction.PRINT_VERSION
}

/**
Expand All @@ -482,7 +480,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
*/
override protected def handleUnknown(opt: String): Boolean = {
if (opt.startsWith("-")) {
SparkSubmit.printErrorAndExit(s"Unrecognized option '$opt'.")
error(s"Unrecognized option '$opt'.")
}

primaryResource =
Expand All @@ -501,20 +499,18 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
}

private def printUsageAndExit(exitCode: Int, unknownParam: Any = null): Unit = {
// scalastyle:off println
val outStream = SparkSubmit.printStream
if (unknownParam != null) {
outStream.println("Unknown/unsupported param " + unknownParam)
logInfo("Unknown/unsupported param " + unknownParam)
}
val command = sys.env.get("_SPARK_CMD_USAGE").getOrElse(
"""Usage: spark-submit [options] <app jar | python file | R file> [app arguments]
|Usage: spark-submit --kill [submission ID] --master [spark://...]
|Usage: spark-submit --status [submission ID] --master [spark://...]
|Usage: spark-submit run-example [options] example-class [example args]""".stripMargin)
outStream.println(command)
logInfo(command)

val mem_mb = Utils.DEFAULT_DRIVER_MEM_MB
outStream.println(
logInfo(
s"""
|Options:
| --master MASTER_URL spark://host:port, mesos://host:port, yarn,
Expand Down Expand Up @@ -596,12 +592,11 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
)

if (SparkSubmit.isSqlShell(mainClass)) {
outStream.println("CLI options:")
outStream.println(getSqlShellOptions())
logInfo("CLI options:")
logInfo(getSqlShellOptions())
}
// scalastyle:on println

SparkSubmit.exitFn(exitCode)
throw new SparkUserAppException(exitCode)
}

/**
Expand Down Expand Up @@ -655,4 +650,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
System.setErr(currentErr)
}
}

private def error(msg: String): Unit = throw new SparkException(msg)

}
Loading