Skip to content

Commit 761ebcd

Browse files
committed
Library path and classpath for drivers
1 parent 7cc70e4 commit 761ebcd

File tree

14 files changed

+115
-37
lines changed

14 files changed

+115
-37
lines changed

bin/spark-class

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ fi
9898

9999
# Set JAVA_OPTS to be able to load native libraries and to set heap size
100100
JAVA_OPTS="$OUR_JAVA_OPTS"
101+
JAVA_OPTS="$JAVA_OPTS -Djava.library.path=$_SPARK_LIBRARY_PATH"
101102
JAVA_OPTS="$JAVA_OPTS -Xms$OUR_JAVA_MEM -Xmx$OUR_JAVA_MEM"
102103
# Load extra JAVA_OPTS from conf/java-opts, if it exists
103104
if [ -e "$FWDIR/conf/java-opts" ] ; then

bin/spark-submit

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,13 @@ while (($#)); do
2525
DEPLOY_MODE=$2
2626
elif [ $1 = "--driver-memory" ]; then
2727
DRIVER_MEMORY=$2
28+
elif [ $1 = "--driver-library-path" ]; then
29+
export _SPARK_LIBRARY_PATH=$2
30+
elif [ $1 = "--driver-class-path" ]; then
31+
export SPARK_CLASSPATH="$SPARK_CLASSPATH:$2"
32+
elif [ $1 = "--driver-java-options" ]; then
33+
export SPARK_JAVA_OPTS="$SPARK_JAVA_OPTS $2"
2834
fi
29-
3035
shift
3136
done
3237

conf/spark-env.sh.template

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,17 @@
1616
# - MESOS_NATIVE_LIBRARY, to point to your libmesos.so if you use Mesos
1717
# - SPARK_CLASSPATH, default classpath entries to append
1818

19+
# Options read in YARN client mode
20+
# - SPARK_YARN_APP_JAR, Path to your application’s JAR file (required)
21+
# - SPARK_WORKER_INSTANCES, Number of workers to start (Default: 2)
22+
# - SPARK_WORKER_CORES, Number of cores for the workers (Default: 1).
23+
# - SPARK_WORKER_MEMORY, Memory per Worker (e.g. 1000M, 2G) (Default: 1G)
24+
# - SPARK_MASTER_MEMORY, Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb)
25+
# - SPARK_YARN_APP_NAME, The name of your application (Default: Spark)
26+
# - SPARK_YARN_QUEUE, The hadoop queue to use for allocation requests (Default: ‘default’)
27+
# - SPARK_YARN_DIST_FILES, Comma separated list of files to be distributed with the job.
28+
# - SPARK_YARN_DIST_ARCHIVES, Comma separated list of archives to be distributed with the job.
29+
1930
# Options for the daemons used in the standalone deploy mode:
2031
# - SPARK_MASTER_IP, to bind the master to a different IP address or hostname
2132
# - SPARK_MASTER_PORT / SPARK_MASTER_WEBUI_PORT, to use non-default ports

core/src/main/scala/org/apache/spark/deploy/Client.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,15 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends
5454
System.getenv().foreach{case (k, v) => env(k) = v}
5555

5656
val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"
57+
val classPathEntries = sys.props.get("spark.driver.classPath").toSeq.flatMap { cp =>
58+
cp.split(java.io.File.pathSeparator)
59+
}
60+
val libraryPathEntries = sys.props.get("spark.driver.libraryPath").toSeq.flatMap { cp =>
61+
cp.split(java.io.File.pathSeparator)
62+
}
63+
val javaOpts = sys.props.get("spark.driver.javaOpts").toSeq
5764
val command = new Command(mainClass, Seq("{{WORKER_URL}}", driverArgs.mainClass) ++
58-
driverArgs.driverOptions, env)
65+
driverArgs.driverOptions, env, classPathEntries, libraryPathEntries, javaOpts)
5966

6067
val driverDescription = new DriverDescription(
6168
driverArgs.jarUrl,

core/src/main/scala/org/apache/spark/deploy/Command.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,5 +22,8 @@ import scala.collection.Map
2222
private[spark] case class Command(
2323
mainClass: String,
2424
arguments: Seq[String],
25-
environment: Map[String, String]) {
25+
environment: Map[String, String],
26+
classPathEntries: Seq[String],
27+
libraryPathEntries: Seq[String],
28+
javaOptions: Seq[String]) {
2629
}

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,8 @@ object SparkSubmit {
6565
/**
6666
* @return
6767
* a tuple containing the arguments for the child, a list of classpath
68-
* entries for the child, and the main class for the child
68+
* entries for the child, a list of system propertes, a list of env vars
69+
* and the main class for the child
6970
*/
7071
private[spark] def createLaunchEnv(appArgs: SparkSubmitArguments): (ArrayBuffer[String],
7172
ArrayBuffer[String], Map[String, String], String) = {
@@ -140,6 +141,14 @@ object SparkSubmit {
140141

141142
val options = List[OptionAssigner](
142143
new OptionAssigner(appArgs.master, ALL_CLUSTER_MGRS, false, sysProp = "spark.master"),
144+
145+
new OptionAssigner(appArgs.driverExtraClassPath, STANDALONE | YARN, true,
146+
sysProp = "spark.driver.classPath"),
147+
new OptionAssigner(appArgs.driverExtraJavaOptions, STANDALONE | YARN, true,
148+
sysProp = "spark.driver.javaOpts"),
149+
new OptionAssigner(appArgs.driverExtraLibraryPath, STANDALONE | YARN, true,
150+
sysProp = "spark.driver.libraryPath"),
151+
143152
new OptionAssigner(appArgs.driverMemory, YARN, true, clOption = "--driver-memory"),
144153
new OptionAssigner(appArgs.name, YARN, true, clOption = "--name"),
145154
new OptionAssigner(appArgs.queue, YARN, true, clOption = "--queue"),

core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

Lines changed: 42 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
3131
var totalExecutorCores: String = null
3232
var propertiesFile: String = null
3333
var driverMemory: String = null
34+
var driverExtraClassPath: String = null
35+
var driverExtraLibraryPath: String = null
36+
var driverExtraJavaOptions: String = null
3437
var driverCores: String = null
3538
var supervise: Boolean = false
3639
var queue: String = null
@@ -63,25 +66,28 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
6366

6467
override def toString = {
6568
s"""Parsed arguments:
66-
| master $master
67-
| deployMode $deployMode
68-
| executorMemory $executorMemory
69-
| executorCores $executorCores
70-
| totalExecutorCores $totalExecutorCores
71-
| propertiesFile $propertiesFile
72-
| driverMemory $driverMemory
73-
| driverCores $driverCores
74-
| supervise $supervise
75-
| queue $queue
76-
| numExecutors $numExecutors
77-
| files $files
78-
| archives $archives
79-
| mainClass $mainClass
80-
| primaryResource $primaryResource
81-
| name $name
82-
| childArgs [${childArgs.mkString(" ")}]
83-
| jars $jars
84-
| verbose $verbose
69+
| master $master
70+
| deployMode $deployMode
71+
| executorMemory $executorMemory
72+
| executorCores $executorCores
73+
| totalExecutorCores $totalExecutorCores
74+
| propertiesFile $propertiesFile
75+
| driverMemory $driverMemory
76+
| driverCores $driverCores
77+
| driverExtraClassPath $driverExtraClassPath
78+
| driverExtraLibraryPath $driverExtraLibraryPath
79+
| driverExtraJavaOptions $driverExtraJavaOptions
80+
| supervise $supervise
81+
| queue $queue
82+
| numExecutors $numExecutors
83+
| files $files
84+
| archives $archives
85+
| mainClass $mainClass
86+
| primaryResource $primaryResource
87+
| name $name
88+
| childArgs [${childArgs.mkString(" ")}]
89+
| jars $jars
90+
| verbose $verbose
8591
""".stripMargin
8692
}
8793

@@ -134,6 +140,18 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
134140
driverCores = value
135141
parseOpts(tail)
136142

143+
case ("--driver-class-path") :: value :: tail =>
144+
driverExtraClassPath = value
145+
parseOpts(tail)
146+
147+
case ("--driver-java-opts") :: value :: tail =>
148+
driverExtraJavaOptions = value
149+
parseOpts(tail)
150+
151+
case ("--driver-library-path") :: value :: tail =>
152+
driverExtraLibraryPath = value
153+
parseOpts(tail)
154+
137155
case ("--properties-file") :: value :: tail =>
138156
propertiesFile = value
139157
parseOpts(tail)
@@ -194,11 +212,15 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
194212
| --class CLASS_NAME Name of your app's main class (required for Java apps).
195213
| --arg ARG Argument to be passed to your application's main class. This
196214
| option can be specified multiple times for multiple args.
197-
| --driver-memory MEM Memory for driver (e.g. 1000M, 2G) (Default: 512M).
198215
| --name NAME The name of your application (Default: 'Spark').
199216
| --jars JARS A comma-separated list of local jars to include on the
200217
| driver classpath and that SparkContext.addJar will work
201218
| with. Doesn't work on standalone with 'cluster' deploy mode.
219+
| --driver-memory MEM Memory for driver (e.g. 1000M, 2G) (Default: 512M).
220+
| --driver-java-opts Extra Java options to pass to the driver
221+
| --driver-library-path Extra library path entries to pass to the driver
222+
| --driver-class-path Extra class path entries to pass to the driver
223+
|
202224
|
203225
| Spark standalone with cluster deploy mode only:
204226
| --driver-cores NUM Cores for driver (Default: 1).

core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,8 @@ private[spark] object TestClient {
4949
val (actorSystem, port) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress, 0,
5050
conf = conf, securityManager = new SecurityManager(conf))
5151
val desc = new ApplicationDescription(
52-
"TestClient", Some(1), 512, Command("spark.deploy.client.TestExecutor", Seq(), Map()),
53-
Some("dummy-spark-home"), "ignored")
52+
"TestClient", Some(1), 512, Command("spark.deploy.client.TestExecutor", Seq(), Map(), Seq(),
53+
Seq(), Seq()), Some("dummy-spark-home"), "ignored")
5454
val listener = new TestListener
5555
val client = new AppClient(actorSystem, Array(url), desc, listener, new SparkConf)
5656
client.start()

core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,14 +47,23 @@ object CommandUtils extends Logging {
4747
*/
4848
def buildJavaOpts(command: Command, memory: Int, sparkHome: String): Seq[String] = {
4949
val memoryOpts = Seq(s"-Xms${memory}M", s"-Xmx${memory}M")
50+
val libraryOpts =
51+
if (command.libraryPathEntries.size > 0) {
52+
val joined = command.libraryPathEntries.mkString(File.pathSeparator)
53+
Seq(s"-Djava.library.path=$joined")
54+
} else {
55+
Seq()
56+
}
5057

5158
// Figure out our classpath with the external compute-classpath script
5259
val ext = if (System.getProperty("os.name").startsWith("Windows")) ".cmd" else ".sh"
5360
val classPath = Utils.executeAndGetOutput(
5461
Seq(sparkHome + "/bin/compute-classpath" + ext),
5562
extraEnvironment=command.environment)
63+
val userClassPath = command.classPathEntries.mkString(File.pathSeparator)
64+
val classPathWithUser = classPath + File.pathSeparator + userClassPath
5665

57-
Seq("-cp", classPath) ++ memoryOpts
66+
Seq("-cp", classPathWithUser) ++ libraryOpts ++ memoryOpts ++ command.javaOptions
5867
}
5968

6069
/** Spawn a thread that will redirect a given stream to a file */

core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.deploy.worker
2020
import java.io._
2121

2222
import scala.collection.JavaConversions._
23-
import scala.collection.mutable.Map
23+
import scala.collection.Map
2424

2525
import akka.actor.ActorRef
2626
import com.google.common.base.Charsets
@@ -74,13 +74,17 @@ private[spark] class DriverRunner(
7474

7575
// Make sure user application jar is on the classpath
7676
// TODO: If we add ability to submit multiple jars they should also be added here
77-
val env = Map(driverDesc.command.environment.toSeq: _*)
78-
env("SPARK_CLASSPATH") = env.getOrElse("SPARK_CLASSPATH", "") + s":$localJarFilename"
79-
val newCommand = Command(driverDesc.command.mainClass,
80-
driverDesc.command.arguments.map(substituteVariables), env)
77+
val classPath = driverDesc.command.classPathEntries ++ Seq(s":$localJarFilename")
78+
val newCommand = Command(
79+
driverDesc.command.mainClass,
80+
driverDesc.command.arguments.map(substituteVariables),
81+
driverDesc.command.environment,
82+
classPath,
83+
driverDesc.command.libraryPathEntries,
84+
driverDesc.command.javaOptions)
8185
val command = CommandUtils.buildCommandSeq(newCommand, driverDesc.mem,
8286
sparkHome.getAbsolutePath)
83-
launchDriver(command, env, driverDir, driverDesc.supervise)
87+
launchDriver(command, driverDesc.command.environment, driverDir, driverDesc.supervise)
8488
}
8589
catch {
8690
case e: Exception => finalException = Some(e)

0 commit comments

Comments
 (0)