Skip to content

Commit bbdce05

Browse files
author
Marcelo Vanzin
committed
[SPARK-1395] Fix "local:" URI support in Yarn mode (again).
Recent changes ignored the fact that path may be defined with "local:" URIs, which means they need to be explicitly added to the classpath everywhere a remote process is started. This change fixes that by: - Using the correct methods to add paths to the classpath - Creating SparkConf settings for the Spark jar itself and for the user's jar - Propagating those two settings to the remote processes where needed This ensures that both in client and in cluster mode, the driver has the necessary info to build the executor's classpath and have things still work when they contain "local:" references. On the cleanup front, I removed the hacky way that log4j configuration was being propagated to handle the "local:" case. It's much more cleanly (and generically) handled by using spark-submit arguments (--files to upload a config file, or setting spark.executor.extraJavaOptions to pass JVM arguments and use a local file).
1 parent 777c595 commit bbdce05

File tree

4 files changed

+181
-77
lines changed

4 files changed

+181
-77
lines changed

yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala

Lines changed: 96 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ trait ClientBase extends Logging {
154154
}
155155

156156
/** Copy the file into HDFS if needed. */
157-
private def copyRemoteFile(
157+
private[yarn] def copyRemoteFile(
158158
dstDir: Path,
159159
originalPath: Path,
160160
replication: Short,
@@ -213,10 +213,10 @@ trait ClientBase extends Logging {
213213

214214
val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
215215

216-
Map(
217-
ClientBase.SPARK_JAR -> ClientBase.getSparkJar, ClientBase.APP_JAR -> args.userJar,
218-
ClientBase.LOG4J_PROP -> System.getenv(ClientBase.LOG4J_CONF_ENV_KEY)
219-
).foreach { case(destName, _localPath) =>
216+
List(
217+
(ClientBase.SPARK_JAR, ClientBase.sparkJar(sparkConf), ClientBase.CONF_SPARK_JAR),
218+
(ClientBase.APP_JAR, args.userJar, ClientBase.CONF_SPARK_USER_JAR)
219+
).foreach { case(destName, _localPath, confKey) =>
220220
val localPath: String = if (_localPath != null) _localPath.trim() else ""
221221
if (! localPath.isEmpty()) {
222222
val localURI = new URI(localPath)
@@ -225,6 +225,8 @@ trait ClientBase extends Logging {
225225
val destPath = copyRemoteFile(dst, qualifyForLocal(localURI), replication, setPermissions)
226226
distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
227227
destName, statCache)
228+
} else {
229+
sparkConf.set(confKey, localPath)
228230
}
229231
}
230232
}
@@ -246,6 +248,8 @@ trait ClientBase extends Logging {
246248
if (addToClasspath) {
247249
cachedSecondaryJarLinks += linkname
248250
}
251+
} else if (addToClasspath) {
252+
cachedSecondaryJarLinks += file.trim()
249253
}
250254
}
251255
}
@@ -265,14 +269,10 @@ trait ClientBase extends Logging {
265269
val env = new HashMap[String, String]()
266270

267271
val extraCp = sparkConf.getOption("spark.driver.extraClassPath")
268-
val log4jConf = System.getenv(ClientBase.LOG4J_CONF_ENV_KEY)
269-
ClientBase.populateClasspath(yarnConf, sparkConf, log4jConf, env, extraCp)
272+
ClientBase.populateClasspath(args, yarnConf, sparkConf, env, extraCp)
270273
env("SPARK_YARN_MODE") = "true"
271274
env("SPARK_YARN_STAGING_DIR") = stagingDir
272275
env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName()
273-
if (log4jConf != null) {
274-
env(ClientBase.LOG4J_CONF_ENV_KEY) = log4jConf
275-
}
276276

277277
// Set the environment variables to be passed on to the executors.
278278
distCacheMgr.setDistFilesEnv(env)
@@ -286,6 +286,7 @@ trait ClientBase extends Logging {
286286
env("SPARK_YARN_USER_ENV") = userEnvs
287287
}
288288

289+
logInfo(s"ApplicationMaster environment: $env")
289290
env
290291
}
291292

@@ -364,7 +365,6 @@ trait ClientBase extends Logging {
364365
sys.props.get("spark.driver.extraJavaOptions").foreach(opts => javaOpts += opts)
365366
sys.props.get("spark.driver.libraryPath").foreach(p => javaOpts += s"-Djava.library.path=$p")
366367
}
367-
javaOpts += ClientBase.getLog4jConfiguration(localResources)
368368

369369
// Command for the ApplicationMaster
370370
val commands = Seq(Environment.JAVA_HOME.$() + "/bin/java", "-server") ++
@@ -391,12 +391,31 @@ trait ClientBase extends Logging {
391391
object ClientBase extends Logging {
392392
val SPARK_JAR: String = "__spark__.jar"
393393
val APP_JAR: String = "__app__.jar"
394-
val LOG4J_PROP: String = "log4j.properties"
395-
val LOG4J_CONF_ENV_KEY: String = "SPARK_LOG4J_CONF"
396394
val LOCAL_SCHEME = "local"
395+
val CONF_SPARK_JAR = "spark.yarn.jar"
396+
val CONF_SPARK_USER_JAR = "spark.yarn.user.jar"
397397
val CONF_SPARK_YARN_SECONDARY_JARS = "spark.yarn.secondary.jars"
398+
val ENV_SPARK_JAR = "SPARK_JAR"
398399

399-
def getSparkJar = sys.env.get("SPARK_JAR").getOrElse(SparkContext.jarOfClass(this.getClass).head)
400+
/**
401+
* Find the user-defined Spark jar if configured, or return the jar containing this
402+
* class if not.
403+
*
404+
* This method first looks in the SparkConf object for the CONF_SPARK_JAR key, and in the
405+
* user environment if that is not found (for backwards compatibility).
406+
*/
407+
def sparkJar(conf: SparkConf) = {
408+
if (conf.contains(CONF_SPARK_JAR)) {
409+
conf.get(CONF_SPARK_JAR)
410+
} else if (System.getenv(ENV_SPARK_JAR) != null) {
411+
logWarning(
412+
s"$ENV_SPARK_JAR detected in the system environment. This variable has been deprecated " +
413+
s"in favor of the $CONF_SPARK_JAR configuration variable.")
414+
System.getenv(ENV_SPARK_JAR)
415+
} else {
416+
SparkContext.jarOfClass(this.getClass).head
417+
}
418+
}
400419

401420
def populateHadoopClasspath(conf: Configuration, env: HashMap[String, String]) = {
402421
val classPathElementsToAdd = getYarnAppClasspath(conf) ++ getMRAppClasspath(conf)
@@ -469,71 +488,75 @@ object ClientBase extends Logging {
469488
triedDefault.toOption
470489
}
471490

491+
def populateClasspath(args: ClientArguments, conf: Configuration, sparkConf: SparkConf,
492+
env: HashMap[String, String], extraClassPath: Option[String] = None) {
493+
extraClassPath.foreach(addClasspathEntry(_, env))
494+
addClasspathEntry(Environment.PWD.$(), env)
495+
496+
// Normally the users app.jar is last in case conflicts with spark jars
497+
if (sparkConf.get("spark.yarn.user.classpath.first", "false").toBoolean) {
498+
addUserClasspath(args, sparkConf, env)
499+
addFileToClasspath(sparkJar(sparkConf), SPARK_JAR, env)
500+
ClientBase.populateHadoopClasspath(conf, env)
501+
} else {
502+
addFileToClasspath(sparkJar(sparkConf), SPARK_JAR, env)
503+
ClientBase.populateHadoopClasspath(conf, env)
504+
addUserClasspath(args, sparkConf, env)
505+
}
506+
507+
// Append all class files and jar files under the working directory to the classpath.
508+
addFileToClasspath("*", null, env)
509+
}
472510

473511
/**
474-
* Returns the java command line argument for setting up log4j. If there is a log4j.properties
475-
* in the given local resources, it is used, otherwise the SPARK_LOG4J_CONF environment variable
476-
* is checked.
512+
* Adds the user jars which have local: URIs (or alternate names, such as APP_JAR) explicitly
513+
* to the classpath.
477514
*/
478-
def getLog4jConfiguration(localResources: HashMap[String, LocalResource]): String = {
479-
var log4jConf = LOG4J_PROP
480-
if (!localResources.contains(log4jConf)) {
481-
log4jConf = System.getenv(LOG4J_CONF_ENV_KEY) match {
482-
case conf: String =>
483-
val confUri = new URI(conf)
484-
if (ClientBase.LOCAL_SCHEME.equals(confUri.getScheme())) {
485-
"file://" + confUri.getPath()
486-
} else {
487-
ClientBase.LOG4J_PROP
488-
}
489-
case null => "log4j-spark-container.properties"
515+
private def addUserClasspath(args: ClientArguments, conf: SparkConf,
516+
env: HashMap[String, String]) = {
517+
if (args != null) {
518+
addFileToClasspath(args.userJar, APP_JAR, env)
519+
if (args.addJars != null) {
520+
args.addJars.split(",").foreach { case file: String =>
521+
addFileToClasspath(file, null, env)
522+
}
490523
}
524+
} else {
525+
val userJar = conf.getOption(CONF_SPARK_USER_JAR).getOrElse(null)
526+
addFileToClasspath(userJar, APP_JAR, env)
527+
528+
val cachedSecondaryJarLinks =
529+
conf.getOption(CONF_SPARK_YARN_SECONDARY_JARS).getOrElse("").split(",")
530+
cachedSecondaryJarLinks.foreach(jar => addFileToClasspath(jar, null, env))
491531
}
492-
" -Dlog4j.configuration=" + log4jConf
493532
}
494533

495-
def populateClasspath(conf: Configuration, sparkConf: SparkConf, log4jConf: String,
496-
env: HashMap[String, String], extraClassPath: Option[String] = None) {
497-
498-
if (log4jConf != null) {
499-
// If a custom log4j config file is provided as a local: URI, add its parent directory to the
500-
// classpath. Note that this only works if the custom config's file name is
501-
// "log4j.properties".
502-
val localPath = getLocalPath(log4jConf)
503-
if (localPath != null) {
504-
val parentPath = new File(localPath).getParent()
505-
YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, parentPath,
506-
File.pathSeparator)
534+
/**
535+
* Adds the given path to the classpath, handling "local:" URIs correctly.
536+
*
537+
* If an alternate name for the file is given, and it's not a "local:" file, the alternate
538+
* name will be added to the classpath (relative to the job's work directory).
539+
*
540+
* If not a "local:" file and no alternate name, the environment is not modified.
541+
*
542+
* @param path Path to add to classpath (optional).
543+
* @param fileName Alternate name for the file (optional).
544+
* @param env Map holding the environment variables.
545+
*/
546+
private def addFileToClasspath(path: String, fileName: String,
547+
env: HashMap[String, String]) : Unit = {
548+
if (path != null) {
549+
scala.util.control.Exception.ignoring(classOf[URISyntaxException]) {
550+
val localPath = getLocalPath(path)
551+
if (localPath != null) {
552+
addClasspathEntry(localPath, env)
553+
return
554+
}
507555
}
508556
}
509-
510-
/** Add entry to the classpath. */
511-
def addClasspathEntry(path: String) = YarnSparkHadoopUtil.addToEnvironment(env,
512-
Environment.CLASSPATH.name, path, File.pathSeparator)
513-
/** Add entry to the classpath. Interpreted as a path relative to the working directory. */
514-
def addPwdClasspathEntry(entry: String) =
515-
addClasspathEntry(Environment.PWD.$() + Path.SEPARATOR + entry)
516-
517-
extraClassPath.foreach(addClasspathEntry)
518-
519-
val cachedSecondaryJarLinks =
520-
sparkConf.getOption(CONF_SPARK_YARN_SECONDARY_JARS).getOrElse("").split(",")
521-
.filter(_.nonEmpty)
522-
// Normally the users app.jar is last in case conflicts with spark jars
523-
if (sparkConf.get("spark.yarn.user.classpath.first", "false").toBoolean) {
524-
addPwdClasspathEntry(APP_JAR)
525-
cachedSecondaryJarLinks.foreach(addPwdClasspathEntry)
526-
addPwdClasspathEntry(SPARK_JAR)
527-
ClientBase.populateHadoopClasspath(conf, env)
528-
} else {
529-
addPwdClasspathEntry(SPARK_JAR)
530-
ClientBase.populateHadoopClasspath(conf, env)
531-
addPwdClasspathEntry(APP_JAR)
532-
cachedSecondaryJarLinks.foreach(addPwdClasspathEntry)
557+
if (fileName != null) {
558+
addClasspathEntry(Environment.PWD.$() + Path.SEPARATOR + fileName, env);
533559
}
534-
// Append all class files and jar files under the working directory to the classpath.
535-
addClasspathEntry(Environment.PWD.$())
536-
addPwdClasspathEntry("*")
537560
}
538561

539562
/**
@@ -547,4 +570,8 @@ object ClientBase extends Logging {
547570
null
548571
}
549572

573+
private def addClasspathEntry(path: String, env: HashMap[String, String]) =
574+
YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, path,
575+
File.pathSeparator)
576+
550577
}

yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@ trait ExecutorRunnableUtil extends Logging {
5858

5959
javaOpts += "-Djava.io.tmpdir=" +
6060
new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)
61-
javaOpts += ClientBase.getLog4jConfiguration(localResources)
6261

6362
// Certain configs need to be passed here because they are needed before the Executor
6463
// registers with the Scheduler and transfers the spark configs. Since the Executor backend
@@ -166,13 +165,8 @@ trait ExecutorRunnableUtil extends Logging {
166165

167166
def prepareEnvironment: HashMap[String, String] = {
168167
val env = new HashMap[String, String]()
169-
170168
val extraCp = sparkConf.getOption("spark.executor.extraClassPath")
171-
val log4jConf = System.getenv(ClientBase.LOG4J_CONF_ENV_KEY)
172-
ClientBase.populateClasspath(yarnConf, sparkConf, log4jConf, env, extraCp)
173-
if (log4jConf != null) {
174-
env(ClientBase.LOG4J_CONF_ENV_KEY) = log4jConf
175-
}
169+
ClientBase.populateClasspath(null, yarnConf, sparkConf, env, extraCp)
176170

177171
// Allow users to specify some environment variables
178172
YarnSparkHadoopUtil.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV"),

yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala

Lines changed: 83 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,29 @@
1717

1818
package org.apache.spark.deploy.yarn
1919

20+
import java.io.File
2021
import java.net.URI
2122

23+
import com.google.common.io.Files
2224
import org.apache.hadoop.conf.Configuration
25+
import org.apache.hadoop.fs.Path
2326
import org.apache.hadoop.mapreduce.MRJobConfig
2427
import org.apache.hadoop.yarn.conf.YarnConfiguration
2528
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
26-
29+
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse
30+
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext
31+
import org.apache.hadoop.yarn.conf.YarnConfiguration
32+
import org.mockito.Matchers._
33+
import org.mockito.Mockito._
2734
import org.scalatest.FunSuite
2835
import org.scalatest.matchers.ShouldMatchers._
2936

3037
import scala.collection.JavaConversions._
3138
import scala.collection.mutable.{ HashMap => MutableHashMap }
3239
import scala.util.Try
3340

41+
import org.apache.spark.SparkConf
42+
import org.apache.spark.util.Utils
3443

3544
class ClientBaseSuite extends FunSuite {
3645

@@ -68,6 +77,65 @@ class ClientBaseSuite extends FunSuite {
6877
}
6978
}
7079

80+
private val SPARK = "local:/sparkJar"
81+
private val USER = "local:/userJar"
82+
private val ADDED = "local:/addJar1,local:/addJar2,/addJar3"
83+
84+
test("Local jar URIs") {
85+
val conf = new Configuration()
86+
val sparkConf = new SparkConf().set(ClientBase.CONF_SPARK_JAR, SPARK)
87+
val env = new MutableHashMap[String, String]()
88+
val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf)
89+
90+
ClientBase.populateClasspath(args, conf, sparkConf, env, None)
91+
92+
val jars = env("CLASSPATH").split(File.pathSeparator)
93+
s"$SPARK,$USER,$ADDED".split(",").foreach({ jar =>
94+
val uri = new URI(jar)
95+
if (ClientBase.LOCAL_SCHEME.equals(uri.getScheme())) {
96+
jars should contain (uri.getPath())
97+
} else {
98+
jars should not contain (uri.getPath())
99+
}
100+
})
101+
jars should not contain (ClientBase.SPARK_JAR)
102+
jars should not contain (ClientBase.APP_JAR)
103+
}
104+
105+
test("Jar path propagation through SparkConf") {
106+
val conf = new Configuration()
107+
val sparkConf = new SparkConf().set(ClientBase.CONF_SPARK_JAR, SPARK)
108+
val yarnConf = new YarnConfiguration()
109+
val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf)
110+
111+
val client = spy(new DummyClient(args, conf, sparkConf, yarnConf))
112+
doReturn(new Path("/")).when(client).copyRemoteFile(any(classOf[Path]),
113+
any(classOf[Path]), any(classOf[Short]), any(classOf[Boolean]))
114+
115+
var tempDir = Files.createTempDir();
116+
try {
117+
client.prepareLocalResources(tempDir.getAbsolutePath())
118+
sparkConf.getOption(ClientBase.CONF_SPARK_USER_JAR) should be (Some(USER))
119+
120+
// The non-local path should be propagated by name only, since it will end up in the app's
121+
// staging dir.
122+
val expected = ADDED.split(",")
123+
.map(p => {
124+
val uri = new URI(p)
125+
if (ClientBase.LOCAL_SCHEME == uri.getScheme()) {
126+
p
127+
} else {
128+
Option(uri.getFragment()).getOrElse(new File(p).getName())
129+
}
130+
})
131+
.mkString(",")
132+
133+
sparkConf.getOption(ClientBase.CONF_SPARK_YARN_SECONDARY_JARS) should be (Some(expected))
134+
} finally {
135+
Utils.deleteRecursively(tempDir)
136+
}
137+
}
138+
71139
object Fixtures {
72140

73141
val knownDefYarnAppCP: Seq[String] =
@@ -109,4 +177,18 @@ class ClientBaseSuite extends FunSuite {
109177
def getFieldValue[A, B](clazz: Class[_], field: String, defaults: => B)(mapTo: A => B): B =
110178
Try(clazz.getField(field)).map(_.get(null).asInstanceOf[A]).toOption.map(mapTo).getOrElse(defaults)
111179

180+
private class DummyClient(
181+
val args: ClientArguments,
182+
val conf: Configuration,
183+
val sparkConf: SparkConf,
184+
val yarnConf: YarnConfiguration) extends ClientBase {
185+
186+
override def calculateAMMemory(newApp: GetNewApplicationResponse): Int =
187+
throw new UnsupportedOperationException()
188+
189+
override def setupSecurityToken(amContainer: ContainerLaunchContext): Unit =
190+
throw new UnsupportedOperationException()
191+
192+
}
193+
112194
}

yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ class ExecutorRunnable(
8181
val commands = prepareCommand(masterAddress, slaveId, hostname, executorMemory, executorCores,
8282
localResources)
8383

84+
logInfo(s"Setting up executor with environment: $env")
8485
logInfo("Setting up executor with commands: " + commands)
8586
ctx.setCommands(commands)
8687

0 commit comments

Comments
 (0)