Skip to content

Commit 63470af

Browse files
nezihyigitbasiMarcelo Vanzin
authored andcommitted
[SPARK-15782][YARN] Fix spark.jars and spark.yarn.dist.jars handling
When `--packages` is specified with spark-shell the classes from those packages cannot be found, which I think is due to some of the changes in SPARK-12343. Tested manually with both scala 2.10 and 2.11 repls. vanzin davies can you guys please review? Author: Marcelo Vanzin <[email protected]> Author: Nezih Yigitbasi <[email protected]> Closes #13709 from nezihyigitbasi/SPARK-15782.
1 parent f1bf0d2 commit 63470af

File tree

5 files changed

+59
-16
lines changed

5 files changed

+59
-16
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -391,7 +391,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
391391

392392
_conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER)
393393

394-
_jars = _conf.getOption("spark.jars").map(_.split(",")).map(_.filter(_.nonEmpty)).toSeq.flatten
394+
_jars = Utils.getUserJars(_conf)
395395
_files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.nonEmpty))
396396
.toSeq.flatten
397397

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2352,6 +2352,31 @@ private[spark] object Utils extends Logging {
23522352
log.info(s"Started daemon with process name: ${Utils.getProcessName()}")
23532353
SignalUtils.registerLogger(log)
23542354
}
2355+
2356+
/**
2357+
* Unions two comma-separated lists of files and filters out empty strings.
2358+
*/
2359+
def unionFileLists(leftList: Option[String], rightList: Option[String]): Set[String] = {
2360+
var allFiles = Set[String]()
2361+
leftList.foreach { value => allFiles ++= value.split(",") }
2362+
rightList.foreach { value => allFiles ++= value.split(",") }
2363+
allFiles.filter { _.nonEmpty }
2364+
}
2365+
2366+
/**
2367+
* In YARN mode this method returns a union of the jar files pointed by "spark.jars" and the
2368+
* "spark.yarn.dist.jars" properties, while in other modes it returns the jar files pointed by
2369+
* only the "spark.jars" property.
2370+
*/
2371+
def getUserJars(conf: SparkConf): Seq[String] = {
2372+
val sparkJars = conf.getOption("spark.jars")
2373+
if (conf.get("spark.master") == "yarn") {
2374+
val yarnJars = conf.getOption("spark.yarn.dist.jars")
2375+
unionFileLists(sparkJars, yarnJars).toSeq
2376+
} else {
2377+
sparkJars.map(_.split(",")).map(_.filter(_.nonEmpty)).toSeq.flatten
2378+
}
2379+
}
23552380
}
23562381

23572382
/**

core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -570,6 +570,18 @@ class SparkSubmitSuite
570570
appArgs.executorMemory should be ("2.3g")
571571
}
572572
}
573+
574+
test("comma separated list of files are unioned correctly") {
575+
val left = Option("/tmp/a.jar,/tmp/b.jar")
576+
val right = Option("/tmp/c.jar,/tmp/a.jar")
577+
val emptyString = Option("")
578+
Utils.unionFileLists(left, right) should be (Set("/tmp/a.jar", "/tmp/b.jar", "/tmp/c.jar"))
579+
Utils.unionFileLists(emptyString, emptyString) should be (Set.empty)
580+
Utils.unionFileLists(Option("/tmp/a.jar"), emptyString) should be (Set("/tmp/a.jar"))
581+
Utils.unionFileLists(emptyString, Option("/tmp/a.jar")) should be (Set("/tmp/a.jar"))
582+
Utils.unionFileLists(None, Option("/tmp/a.jar")) should be (Set("/tmp/a.jar"))
583+
Utils.unionFileLists(Option("/tmp/a.jar"), None) should be (Set("/tmp/a.jar"))
584+
}
573585
// scalastyle:on println
574586

575587
// NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly.

repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -201,10 +201,10 @@ class SparkILoop(
201201
if (Utils.isWindows) {
202202
// Strip any URI scheme prefix so we can add the correct path to the classpath
203203
// e.g. file:/C:/my/path.jar -> C:/my/path.jar
204-
SparkILoop.getAddedJars.map { jar => new URI(jar).getPath.stripPrefix("/") }
204+
getAddedJars().map { jar => new URI(jar).getPath.stripPrefix("/") }
205205
} else {
206206
// We need new URI(jar).getPath here for the case that `jar` includes encoded white space (%20).
207-
SparkILoop.getAddedJars.map { jar => new URI(jar).getPath }
207+
getAddedJars().map { jar => new URI(jar).getPath }
208208
}
209209
// work around for Scala bug
210210
val totalClassPath = addedJars.foldLeft(
@@ -1005,7 +1005,7 @@ class SparkILoop(
10051005
@DeveloperApi
10061006
def createSparkSession(): SparkSession = {
10071007
val execUri = System.getenv("SPARK_EXECUTOR_URI")
1008-
val jars = SparkILoop.getAddedJars
1008+
val jars = getAddedJars()
10091009
val conf = new SparkConf()
10101010
.setMaster(getMaster())
10111011
.setJars(jars)
@@ -1060,22 +1060,30 @@ class SparkILoop(
10601060

10611061
@deprecated("Use `process` instead", "2.9.0")
10621062
private def main(settings: Settings): Unit = process(settings)
1063-
}
10641063

1065-
object SparkILoop extends Logging {
1066-
implicit def loopToInterpreter(repl: SparkILoop): SparkIMain = repl.intp
1067-
private def echo(msg: String) = Console println msg
1068-
1069-
def getAddedJars: Array[String] = {
1064+
private[repl] def getAddedJars(): Array[String] = {
1065+
val conf = new SparkConf().setMaster(getMaster())
10701066
val envJars = sys.env.get("ADD_JARS")
10711067
if (envJars.isDefined) {
10721068
logWarning("ADD_JARS environment variable is deprecated, use --jar spark submit argument instead")
10731069
}
1074-
val propJars = sys.props.get("spark.jars").flatMap { p => if (p == "") None else Some(p) }
1075-
val jars = propJars.orElse(envJars).getOrElse("")
1070+
val jars = {
1071+
val userJars = Utils.getUserJars(conf)
1072+
if (userJars.isEmpty) {
1073+
envJars.getOrElse("")
1074+
} else {
1075+
userJars.mkString(",")
1076+
}
1077+
}
10761078
Utils.resolveURIs(jars).split(",").filter(_.nonEmpty)
10771079
}
10781080

1081+
}
1082+
1083+
object SparkILoop extends Logging {
1084+
implicit def loopToInterpreter(repl: SparkILoop): SparkIMain = repl.intp
1085+
private def echo(msg: String) = Console println msg
1086+
10791087
// Designed primarily for use by test code: take a String with a
10801088
// bunch of code, and prints out a transcript of what it would look
10811089
// like if you'd just typed it into the repl.
@@ -1109,7 +1117,7 @@ object SparkILoop extends Logging {
11091117
if (settings.classpath.isDefault)
11101118
settings.classpath.value = sys.props("java.class.path")
11111119

1112-
getAddedJars.map(jar => new URI(jar).getPath).foreach(settings.classpath.append(_))
1120+
repl.getAddedJars().map(jar => new URI(jar).getPath).foreach(settings.classpath.append(_))
11131121

11141122
repl process settings
11151123
}

repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,7 @@ object Main extends Logging {
5454
// Visible for testing
5555
private[repl] def doMain(args: Array[String], _interp: SparkILoop): Unit = {
5656
interp = _interp
57-
val jars = conf.getOption("spark.jars")
58-
.map(_.replace(",", File.pathSeparator))
59-
.getOrElse("")
57+
val jars = Utils.getUserJars(conf).mkString(File.pathSeparator)
6058
val interpArguments = List(
6159
"-Yrepl-class-based",
6260
"-Yrepl-outdir", s"${outputDir.getAbsolutePath}",

0 commit comments

Comments
 (0)