Skip to content

Commit 0faa3b6

Browse files
committed
Stash of adding config options in submit script and YARN
1 parent ac2d65e commit 0faa3b6

File tree

7 files changed

+69
-15
lines changed

7 files changed

+69
-15
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@
1717

1818
package org.apache.spark.deploy
1919

20-
import java.io.{PrintStream, File}
20+
import java.io.{FileInputStream, PrintStream, File}
2121
import java.net.URL
22+
import java.util.Properties
2223

2324
import org.apache.spark.executor.ExecutorURLClassLoader
2425

26+
import scala.collection.JavaConversions._
2527
import scala.collection.mutable.ArrayBuffer
2628
import scala.collection.mutable.HashMap
2729
import scala.collection.mutable.Map
@@ -108,6 +110,21 @@ object SparkSubmit {
108110
val sysProps = new HashMap[String, String]()
109111
var childMainClass = ""
110112

113+
// Load system properties by default from the file, if present
114+
if (appArgs.verbose) printStream.println(s"Using properties file: ${appArgs.propertiesFile}")
115+
Option(appArgs.propertiesFile).map { filename =>
116+
val file = new File(filename)
117+
getDefaultProperties(file).foreach { case (k, v) =>
118+
if (k.startsWith("spark")) {
119+
sysProps(k) = v
120+
if (appArgs.verbose) printStream.println(s"Adding default property: $k=$v")
121+
}
122+
else {
123+
printWarning(s"Ignoring non-spark config property: $k=$v")
124+
}
125+
}
126+
}
127+
111128
if (clusterManager == MESOS && deployOnCluster) {
112129
printErrorAndExit("Mesos does not support running the driver on the cluster")
113130
}
@@ -191,11 +208,11 @@ object SparkSubmit {
191208
sysProps: Map[String, String], childMainClass: String, verbose: Boolean = false) {
192209

193210
if (verbose) {
194-
System.err.println(s"Main class:\n$childMainClass")
195-
System.err.println(s"Arguments:\n${childArgs.mkString("\n")}")
196-
System.err.println(s"System properties:\n${sysProps.mkString("\n")}")
197-
System.err.println(s"Classpath elements:\n${childClasspath.mkString("\n")}")
198-
System.err.println("\n")
211+
printStream.println(s"Main class:\n$childMainClass")
212+
printStream.println(s"Arguments:\n${childArgs.mkString("\n")}")
213+
printStream.println(s"System properties:\n${sysProps.mkString("\n")}")
214+
printStream.println(s"Classpath elements:\n${childClasspath.mkString("\n")}")
215+
printStream.println("\n")
199216
}
200217

201218
val loader = new ExecutorURLClassLoader(new Array[URL](0),
@@ -224,6 +241,13 @@ object SparkSubmit {
224241
val url = localJarFile.getAbsoluteFile.toURI.toURL
225242
loader.addURL(url)
226243
}
244+
245+
private def getDefaultProperties(file: File): Seq[(String, String)] = {
246+
val inputStream = new FileInputStream(file)
247+
val properties = new Properties()
248+
properties.load(inputStream)
249+
properties.stringPropertyNames().toSeq.map(k => (k, properties(k)))
250+
}
227251
}
228252

229253
private[spark] class OptionAssigner(val value: String,

core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.deploy
1919

2020
import scala.collection.mutable.ArrayBuffer
21+
import java.io.File
2122

2223
/**
2324
* Parses and encapsulates arguments from the spark-submit script.
@@ -28,6 +29,7 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
2829
var executorMemory: String = null
2930
var executorCores: String = null
3031
var totalExecutorCores: String = null
32+
var propertiesFile: String = null
3133
var driverMemory: String = null
3234
var driverCores: String = null
3335
var supervise: Boolean = false
@@ -49,6 +51,15 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
4951
if (args.length == 0) printUsageAndExit(-1)
5052
if (primaryResource == null) SparkSubmit.printErrorAndExit("Must specify a primary resource")
5153
if (mainClass == null) SparkSubmit.printErrorAndExit("Must specify a main class with --class")
54+
if (propertiesFile == null) {
55+
val sparkHome = sys.env("SPARK_HOME") // defined via `spark-class`
56+
val sep = File.separator
57+
val defaultPath = s"${sparkHome}${sep}conf${sep}spark-defaults.properties"
58+
val file = new File(defaultPath)
59+
if (file.exists()) {
60+
propertiesFile = file.getAbsolutePath
61+
}
62+
}
5263

5364
override def toString = {
5465
s"""Parsed arguments:
@@ -57,8 +68,9 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
5768
| executorMemory $executorMemory
5869
| executorCores $executorCores
5970
| totalExecutorCores $totalExecutorCores
71+
| propertiesFile $propertiesFile
6072
| driverMemory $driverMemory
61-
| drivercores $driverCores
73+
| driverCores $driverCores
6274
| supervise $supervise
6375
| queue $queue
6476
| numExecutors $numExecutors
@@ -122,6 +134,10 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
122134
driverCores = value
123135
parseOpts(tail)
124136

137+
case ("--properties-file") :: value :: tail =>
138+
propertiesFile = value
139+
parseOpts(tail)
140+
125141
case ("--supervise") :: tail =>
126142
supervise = true
127143
parseOpts(tail)

yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.util.{Records, Apps}
4444
import org.apache.spark.{Logging, SparkConf}
4545
import org.apache.spark.util.Utils
4646
import org.apache.spark.deploy.SparkHadoopUtil
47+
import org.apache.spark.deploy.ExecutorLauncher
4748
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
4849

4950

@@ -340,8 +341,19 @@ trait ClientBase extends Logging {
340341
JAVA_OPTS += " -XX:CMSIncrementalDutyCycle=10 "
341342
}
342343

343-
if (env.isDefinedAt("SPARK_JAVA_OPTS")) {
344-
JAVA_OPTS += " " + env("SPARK_JAVA_OPTS")
344+
345+
if (args.amClass == classOf[ExecutorLauncher].getName) {
346+
// If we are being launched in client mode, forward the spark-conf options
347+
// onto the executor launcher
348+
for ((k, v) <- sparkConf.getAll) {
349+
JAVA_OPTS += s"-D$k=$v"
350+
}
351+
} else {
352+
// If we are being launched in standalone mode, capture and forward any spark
353+
// system properties (e.g. set by spark-class).
354+
for ((k, v) <- sys.props.filterKeys(_.startsWith("spark"))) {
355+
JAVA_OPTS += s"-D$k=$v"
356+
}
345357
}
346358

347359
if (!localResources.contains(ClientBase.LOG4J_PROP)) {

yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,10 @@ trait ExecutorRunnableUtil extends Logging {
5757
// Set the JVM memory
5858
val executorMemoryString = executorMemory + "m"
5959
JAVA_OPTS += "-Xms" + executorMemoryString + " -Xmx" + executorMemoryString + " "
60-
if (env.isDefinedAt("SPARK_JAVA_OPTS")) {
61-
JAVA_OPTS += env("SPARK_JAVA_OPTS") + " "
60+
61+
/* Pass on Spark properties to the driver. */
62+
for ((k, v) <- sys.props.filterKeys(_.startsWith("spark"))) {
63+
JAVA_OPTS += s"-D$k=$v"
6264
}
6365

6466
JAVA_OPTS += " -Djava.io.tmpdir=" +

yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import org.apache.spark.util.Utils
2929
*/
3030
private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configuration) extends TaskSchedulerImpl(sc) {
3131

32-
def this(sc: SparkContext) = this(sc, new Configuration())
32+
def this(sc: SparkContext) = this(sc, sc.getConf)
3333

3434
// By default, rack is unknown
3535
override def getRackForHost(hostPort: String): Option[String] = {

yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.scheduler.cluster
1919

2020
import org.apache.hadoop.yarn.api.records.{ApplicationId, YarnApplicationState}
2121
import org.apache.spark.{SparkException, Logging, SparkContext}
22-
import org.apache.spark.deploy.yarn.{Client, ClientArguments}
22+
import org.apache.spark.deploy.yarn.{Client, ClientArguments, ExecutorLauncher}
2323
import org.apache.spark.scheduler.TaskSchedulerImpl
2424

2525
import scala.collection.mutable.ArrayBuffer
@@ -54,7 +54,7 @@ private[spark] class YarnClientSchedulerBackend(
5454
"--class", "notused",
5555
"--jar", null,
5656
"--args", hostport,
57-
"--am-class", "org.apache.spark.deploy.yarn.ExecutorLauncher"
57+
"--am-class", classOf[ExecutorLauncher].getName
5858
)
5959

6060
// process any optional arguments, given either as environment variables

yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
237237
}
238238

239239
def finishApplicationMaster(status: FinalApplicationStatus) {
240-
logInfo("finish ApplicationMaster with " + status)
240+
logInfo("finish ApplicationEMaster with " + status)
241241
amClient.unregisterApplicationMaster(status, "" /* appMessage */ , "" /* appTrackingUrl */)
242242
}
243243

0 commit comments

Comments
 (0)