Skip to content

Commit b4916d4

Browse files
xkrogenHyukjinKwon
authored andcommitted
[SPARK-35672][CORE][YARN][3.1] Pass user classpath entries to executors using config instead of command line
### What changes were proposed in this pull request? Refactor the logic for constructing the user classpath from `yarn.ApplicationMaster` into `yarn.Client` so that it can be leveraged on the executor side as well, instead of having the driver construct it and pass it to the executor via command-line arguments. A new method, `getUserClassPath`, is added to `CoarseGrainedExecutorBackend` which defaults to `Nil` (consistent with the existing behavior where non-YARN resource managers do not configure the user classpath). `YarnCoarseGrainedExecutorBackend` overrides this to construct the user classpath from the existing `APP_JAR` and `SECONDARY_JARS` configs. ### Why are the changes needed? User-provided JARs are made available to executors using a custom classloader, so they do not appear on the standard Java classpath. Instead, they are passed as a list to the executor which then creates a classloader out of the URLs. Currently in the case of YARN, this list of JARs is crafted by the Driver (in `ExecutorRunnable`), which then passes the information to the executors (`CoarseGrainedExecutorBackend`) by specifying each JAR on the executor command line as `--user-class-path /path/to/myjar.jar`. This can cause extremely long argument lists when there are many JARs, which can cause the OS argument length to be exceeded, typically manifesting as the error message: > /bin/bash: Argument list too long A [Google search](https://www.google.com/search?q=spark%20%22%2Fbin%2Fbash%3A%20argument%20list%20too%20long%22&oq=spark%20%22%2Fbin%2Fbash%3A%20argument%20list%20too%20long%22) indicates that this is not a theoretical problem and afflicts real users, including ours. Passing this list using the configurations instead resolves this issue. ### Does this PR introduce _any_ user-facing change? No, except for fixing the bug, allowing for larger JAR lists to be passed successfully. Configuration of JARs is identical to before. ### How was this patch tested? New unit tests were added in `YarnClusterSuite`. Also, we have been running a similar fix internally for 4 months with great success. Note that this is a backport of apache#32810 with minor conflicts around imports. Closes apache#33090 from xkrogen/xkrogen-SPARK-35672-classpath-scalable-branch-3.1. Authored-by: Erik Krogen <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
1 parent 96fb04f commit b4916d4

File tree

9 files changed

+142
-52
lines changed

9 files changed

+142
-52
lines changed

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@ private[spark] class CoarseGrainedExecutorBackend(
5151
bindAddress: String,
5252
hostname: String,
5353
cores: Int,
54-
userClassPath: Seq[URL],
5554
env: SparkEnv,
5655
resourcesFileOpt: Option[String],
5756
resourceProfile: ResourceProfile)
@@ -114,7 +113,7 @@ private[spark] class CoarseGrainedExecutorBackend(
114113
*/
115114
private def createClassLoader(): MutableURLClassLoader = {
116115
val currentLoader = Utils.getContextOrSparkClassLoader
117-
val urls = userClassPath.toArray
116+
val urls = getUserClassPath.toArray
118117
if (env.conf.get(EXECUTOR_USER_CLASS_PATH_FIRST)) {
119118
new ChildFirstURLClassLoader(urls, currentLoader)
120119
} else {
@@ -139,6 +138,8 @@ private[spark] class CoarseGrainedExecutorBackend(
139138
}
140139
}
141140

141+
def getUserClassPath: Seq[URL] = Nil
142+
142143
def extractLogUrls: Map[String, String] = {
143144
val prefix = "SPARK_LOG_URL_"
144145
sys.env.filterKeys(_.startsWith(prefix))
@@ -155,7 +156,7 @@ private[spark] class CoarseGrainedExecutorBackend(
155156
case RegisteredExecutor =>
156157
logInfo("Successfully registered with driver")
157158
try {
158-
executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false,
159+
executor = new Executor(executorId, hostname, env, getUserClassPath, isLocal = false,
159160
resources = _resources)
160161
driver.get.send(LaunchedExecutor(executorId))
161162
} catch {
@@ -368,15 +369,14 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
368369
cores: Int,
369370
appId: String,
370371
workerUrl: Option[String],
371-
userClassPath: mutable.ListBuffer[URL],
372372
resourcesFileOpt: Option[String],
373373
resourceProfileId: Int)
374374

375375
def main(args: Array[String]): Unit = {
376376
val createFn: (RpcEnv, Arguments, SparkEnv, ResourceProfile) =>
377377
CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env, resourceProfile) =>
378378
new CoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId,
379-
arguments.bindAddress, arguments.hostname, arguments.cores, arguments.userClassPath.toSeq,
379+
arguments.bindAddress, arguments.hostname, arguments.cores,
380380
env, arguments.resourcesFileOpt, resourceProfile)
381381
}
382382
run(parseArguments(args, this.getClass.getCanonicalName.stripSuffix("$")), createFn)
@@ -459,7 +459,6 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
459459
var resourcesFileOpt: Option[String] = None
460460
var appId: String = null
461461
var workerUrl: Option[String] = None
462-
val userClassPath = new mutable.ListBuffer[URL]()
463462
var resourceProfileId: Int = DEFAULT_RESOURCE_PROFILE_ID
464463

465464
var argv = args.toList
@@ -490,9 +489,6 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
490489
// Worker url is used in spark standalone mode to enforce fate-sharing with worker
491490
workerUrl = Some(value)
492491
argv = tail
493-
case ("--user-class-path") :: value :: tail =>
494-
userClassPath += new URL(value)
495-
argv = tail
496492
case ("--resourceProfileId") :: value :: tail =>
497493
resourceProfileId = value.toInt
498494
argv = tail
@@ -519,7 +515,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
519515
}
520516

521517
Arguments(driverUrl, executorId, bindAddress, hostname, cores, appId, workerUrl,
522-
userClassPath, resourcesFileOpt, resourceProfileId)
518+
resourcesFileOpt, resourceProfileId)
523519
}
524520

525521
private def printUsageAndExit(classNameForEntry: String): Unit = {
@@ -537,7 +533,6 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
537533
| --resourcesFile <fileWithJSONResourceInformation>
538534
| --app-id <appid>
539535
| --worker-url <workerUrl>
540-
| --user-class-path <url>
541536
| --resourceProfileId <id>
542537
|""".stripMargin)
543538
// scalastyle:on println

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -877,6 +877,8 @@ private[spark] class Executor(
877877
val urls = userClassPath.toArray ++ currentJars.keySet.map { uri =>
878878
new File(uri.split("/").last).toURI.toURL
879879
}
880+
logInfo(s"Starting executor with user classpath (userClassPathFirst = $userClassPathFirst): " +
881+
urls.mkString("'", ",", "'"))
880882
if (userClassPathFirst) {
881883
new ChildFirstURLClassLoader(urls, currentLoader)
882884
} else {

core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.spark.executor
1919

2020
import java.io.File
21-
import java.net.URL
2221
import java.nio.ByteBuffer
2322
import java.util.Properties
2423

@@ -56,7 +55,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
5655

5756
// we don't really use this, just need it to get at the parser function
5857
val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1", "host1",
59-
4, Seq.empty[URL], env, None, resourceProfile)
58+
4, env, None, resourceProfile)
6059
withTempDir { tmpDir =>
6160
val testResourceArgs: JObject = ("" -> "")
6261
val ja = JArray(List(testResourceArgs))
@@ -77,7 +76,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
7776
val env = createMockEnv(conf, serializer)
7877
// we don't really use this, just need it to get at the parser function
7978
val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1", "host1",
80-
4, Seq.empty[URL], env, None, ResourceProfile.getOrCreateDefaultProfile(conf))
79+
4, env, None, ResourceProfile.getOrCreateDefaultProfile(conf))
8180
withTempDir { tmpDir =>
8281
val ra = ResourceAllocation(EXECUTOR_GPU_ID, Seq("0", "1"))
8382
val ja = Extraction.decompose(Seq(ra))
@@ -111,7 +110,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
111110
val env = createMockEnv(conf, serializer)
112111
// we don't really use this, just need it to get at the parser function
113112
val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1", "host1",
114-
4, Seq.empty[URL], env, None, resourceProfile)
113+
4, env, None, resourceProfile)
115114

116115
withTempDir { tmpDir =>
117116
val gpuArgs = ResourceAllocation(EXECUTOR_GPU_ID, Seq("0", "1"))
@@ -138,7 +137,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
138137
val env = createMockEnv(conf, serializer)
139138
// we don't really use this, just need it to get at the parser function
140139
val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1",
141-
4, Seq.empty[URL], env, None, ResourceProfile.getOrCreateDefaultProfile(conf))
140+
4, env, None, ResourceProfile.getOrCreateDefaultProfile(conf))
142141

143142
// not enough gpu's on the executor
144143
withTempDir { tmpDir =>
@@ -191,7 +190,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
191190
val env = createMockEnv(conf, serializer)
192191
// we don't really use this, just need it to get at the parser function
193192
val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1",
194-
4, Seq.empty[URL], env, None, resourceProfile)
193+
4, env, None, resourceProfile)
195194

196195
// executor resources < required
197196
withTempDir { tmpDir =>
@@ -222,7 +221,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
222221

223222
// we don't really use this, just need it to get at the parser function
224223
val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1",
225-
4, Seq.empty[URL], env, None, ResourceProfile.getOrCreateDefaultProfile(conf))
224+
4, env, None, ResourceProfile.getOrCreateDefaultProfile(conf))
226225

227226
val parsedResources = backend.parseOrFindResources(None)
228227

@@ -269,7 +268,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
269268

270269
// we don't really use this, just need it to get at the parser function
271270
val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1",
272-
4, Seq.empty[URL], env, None, resourceProfile)
271+
4, env, None, resourceProfile)
273272
val gpuArgs = ResourceAllocation(EXECUTOR_GPU_ID, Seq("0", "1"))
274273
val ja = Extraction.decompose(Seq(gpuArgs))
275274
val f1 = createTempJsonFile(dir, "resources", ja)
@@ -294,7 +293,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
294293
val rpcEnv = RpcEnv.create("1", "localhost", 0, conf, securityMgr)
295294
val env = createMockEnv(conf, serializer, Some(rpcEnv))
296295
backend = new CoarseGrainedExecutorBackend(env.rpcEnv, rpcEnv.address.hostPort, "1",
297-
"host1", "host1", 4, Seq.empty[URL], env, None,
296+
"host1", "host1", 4, env, None,
298297
resourceProfile = ResourceProfile.getOrCreateDefaultProfile(conf))
299298
assert(backend.taskResources.isEmpty)
300299

resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@
1717

1818
package org.apache.spark.deploy.yarn
1919

20-
import java.io.{File, IOException}
20+
import java.io.IOException
2121
import java.lang.reflect.{InvocationTargetException, Modifier}
22-
import java.net.{URI, URL}
22+
import java.net.URI
2323
import java.security.PrivilegedExceptionAction
2424
import java.util.concurrent.{TimeoutException, TimeUnit}
2525

@@ -80,10 +80,7 @@ private[spark] class ApplicationMaster(
8080
private var metricsSystem: Option[MetricsSystem] = None
8181

8282
private val userClassLoader = {
83-
val classpath = Client.getUserClasspath(sparkConf)
84-
val urls = classpath.map { entry =>
85-
new URL("file:" + new File(entry.getPath()).getAbsolutePath())
86-
}
83+
val urls = Client.getUserClasspathUrls(sparkConf, isClusterMode)
8784

8885
if (isClusterMode) {
8986
if (Client.isUserClassPathFirst(sparkConf, isDriver = true)) {

resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,10 @@
1818
package org.apache.spark.deploy.yarn
1919

2020
import java.io.{FileSystem => _, _}
21-
import java.net.{InetAddress, UnknownHostException, URI}
21+
import java.net.{InetAddress, UnknownHostException, URI, URL}
2222
import java.nio.ByteBuffer
2323
import java.nio.charset.StandardCharsets
24+
import java.nio.file.Paths
2425
import java.util.{Locale, Properties, UUID}
2526
import java.util.zip.{ZipEntry, ZipOutputStream}
2627

@@ -1267,7 +1268,7 @@ private[spark] class Client(
12671268

12681269
}
12691270

1270-
private object Client extends Logging {
1271+
private[spark] object Client extends Logging {
12711272

12721273
// Alias for the user jar
12731274
val APP_JAR_NAME: String = "__app__.jar"
@@ -1429,6 +1430,32 @@ private object Client extends Logging {
14291430
(mainUri ++ secondaryUris).toArray
14301431
}
14311432

1433+
/**
1434+
* Returns a list of local, absolute file URLs representing the user classpath. Note that this
1435+
* must be executed on the same host which will access the URLs, as it will resolve relative
1436+
* paths based on the current working directory.
1437+
*
1438+
* @param conf Spark configuration.
1439+
* @param useClusterPath Whether to use the 'cluster' path when resolving paths with the
1440+
* `local` scheme. This should be used when running on the cluster, but
1441+
* not when running on the gateway (i.e. for the driver in `client` mode).
1442+
* @return Array of local URLs ready to be passed to a [[java.net.URLClassLoader]].
1443+
*/
1444+
def getUserClasspathUrls(conf: SparkConf, useClusterPath: Boolean): Array[URL] = {
1445+
Client.getUserClasspath(conf).map { uri =>
1446+
val inputPath = uri.getPath
1447+
val replacedFilePath = if (Utils.isLocalUri(uri.toString) && useClusterPath) {
1448+
Client.getClusterPath(conf, inputPath)
1449+
} else {
1450+
// Any other URI schemes should have been resolved by this point
1451+
assert(uri.getScheme == null || uri.getScheme == "file" || Utils.isLocalUri(uri.toString),
1452+
"getUserClasspath should only return 'file' or 'local' URIs but found: " + uri)
1453+
inputPath
1454+
}
1455+
Paths.get(replacedFilePath).toAbsolutePath.toUri.toURL
1456+
}
1457+
}
1458+
14321459
private def getMainJarUri(mainJar: Option[String]): Option[URI] = {
14331460
mainJar.flatMap { path =>
14341461
val uri = Utils.resolveURI(path)

resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.spark.deploy.yarn
1919

20-
import java.io.File
2120
import java.nio.ByteBuffer
2221
import java.util.Collections
2322

@@ -188,16 +187,6 @@ private[yarn] class ExecutorRunnable(
188187
// For log4j configuration to reference
189188
javaOpts += ("-Dspark.yarn.app.container.log.dir=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR)
190189

191-
val userClassPath = Client.getUserClasspath(sparkConf).flatMap { uri =>
192-
val absPath =
193-
if (new File(uri.getPath()).isAbsolute()) {
194-
Client.getClusterPath(sparkConf, uri.getPath())
195-
} else {
196-
Client.buildPath(Environment.PWD.$(), uri.getPath())
197-
}
198-
Seq("--user-class-path", "file:" + absPath)
199-
}.toSeq
200-
201190
YarnSparkHadoopUtil.addOutOfMemoryErrorArgument(javaOpts)
202191
val commands = prefixEnv ++
203192
Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++
@@ -209,7 +198,6 @@ private[yarn] class ExecutorRunnable(
209198
"--cores", executorCores.toString,
210199
"--app-id", appId,
211200
"--resourceProfileId", resourceProfileId.toString) ++
212-
userClassPath ++
213201
Seq(
214202
s"1>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout",
215203
s"2>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr")

resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import java.net.URL
2121

2222
import org.apache.spark.SparkEnv
2323
import org.apache.spark.deploy.SparkHadoopUtil
24+
import org.apache.spark.deploy.yarn.Client
2425
import org.apache.spark.internal.Logging
2526
import org.apache.spark.resource.ResourceProfile
2627
import org.apache.spark.rpc.RpcEnv
@@ -38,7 +39,6 @@ private[spark] class YarnCoarseGrainedExecutorBackend(
3839
bindAddress: String,
3940
hostname: String,
4041
cores: Int,
41-
userClassPath: Seq[URL],
4242
env: SparkEnv,
4343
resourcesFile: Option[String],
4444
resourceProfile: ResourceProfile)
@@ -49,13 +49,15 @@ private[spark] class YarnCoarseGrainedExecutorBackend(
4949
bindAddress,
5050
hostname,
5151
cores,
52-
userClassPath,
5352
env,
5453
resourcesFile,
5554
resourceProfile) with Logging {
5655

5756
private lazy val hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(env.conf)
5857

58+
override def getUserClassPath: Seq[URL] =
59+
Client.getUserClasspathUrls(env.conf, useClusterPath = true)
60+
5961
override def extractLogUrls: Map[String, String] = {
6062
YarnContainerInfoHelper.getLogUrls(hadoopConfiguration, container = None)
6163
.getOrElse(Map())
@@ -73,7 +75,7 @@ private[spark] object YarnCoarseGrainedExecutorBackend extends Logging {
7375
val createFn: (RpcEnv, CoarseGrainedExecutorBackend.Arguments, SparkEnv, ResourceProfile) =>
7476
CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env, resourceProfile) =>
7577
new YarnCoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId,
76-
arguments.bindAddress, arguments.hostname, arguments.cores, arguments.userClassPath.toSeq,
78+
arguments.bindAddress, arguments.hostname, arguments.cores,
7779
env, arguments.resourcesFileOpt, resourceProfile)
7880
}
7981
val backendArgs = CoarseGrainedExecutorBackend.parseArguments(args,

resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.deploy.yarn
1919

2020
import java.io.{File, FileInputStream, FileNotFoundException, FileOutputStream}
2121
import java.net.URI
22+
import java.nio.file.Paths
2223
import java.util.Properties
2324
import java.util.concurrent.ConcurrentHashMap
2425

@@ -583,6 +584,40 @@ class ClientSuite extends SparkFunSuite with Matchers {
583584
}
584585
}
585586

587+
test("SPARK-35672: test Client.getUserClasspathUrls") {
588+
val gatewayRootPath = "/local/matching/replace"
589+
val replacementRootPath = "/replaced/path"
590+
val conf = new SparkConf()
591+
.set(SECONDARY_JARS, Seq(
592+
s"local:$gatewayRootPath/foo.jar",
593+
"local:/local/not/matching/replace/foo.jar",
594+
"file:/absolute/file/path/foo.jar",
595+
s"$gatewayRootPath/but-not-actually-local/foo.jar",
596+
"/absolute/path/foo.jar",
597+
"relative/path/foo.jar"
598+
))
599+
.set(GATEWAY_ROOT_PATH, gatewayRootPath)
600+
.set(REPLACEMENT_ROOT_PATH, replacementRootPath)
601+
602+
def assertUserClasspathUrls(cluster: Boolean, expectedReplacementPath: String): Unit = {
603+
val expectedUrls = Seq(
604+
Paths.get(APP_JAR_NAME).toAbsolutePath.toUri.toString,
605+
s"file:$expectedReplacementPath/foo.jar",
606+
"file:/local/not/matching/replace/foo.jar",
607+
"file:/absolute/file/path/foo.jar",
608+
// since this path wasn't a local URI, it should never be replaced
609+
s"file:$gatewayRootPath/but-not-actually-local/foo.jar",
610+
"file:/absolute/path/foo.jar",
611+
Paths.get("relative/path/foo.jar").toAbsolutePath.toUri.toString
612+
).map(URI.create(_).toURL).toArray
613+
assert(Client.getUserClasspathUrls(conf, cluster) === expectedUrls)
614+
}
615+
// assert that no replacement happens when cluster = false by expecting the replacement
616+
// path to be the same as the original path
617+
assertUserClasspathUrls(cluster = false, gatewayRootPath)
618+
assertUserClasspathUrls(cluster = true, replacementRootPath)
619+
}
620+
586621
private val matching = Seq(
587622
("files URI match test1", "file:///file1", "file:///file2"),
588623
("files URI match test2", "file:///c:file1", "file://c:file2"),

0 commit comments

Comments
 (0)