Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ private[spark] class CoarseGrainedExecutorBackend(
bindAddress: String,
hostname: String,
cores: Int,
userClassPath: Seq[URL],
env: SparkEnv,
resourcesFileOpt: Option[String],
resourceProfile: ResourceProfile)
Expand Down Expand Up @@ -124,7 +123,7 @@ private[spark] class CoarseGrainedExecutorBackend(
*/
private def createClassLoader(): MutableURLClassLoader = {
val currentLoader = Utils.getContextOrSparkClassLoader
val urls = userClassPath.toArray
val urls = getUserClassPath.toArray
if (env.conf.get(EXECUTOR_USER_CLASS_PATH_FIRST)) {
new ChildFirstURLClassLoader(urls, currentLoader)
} else {
Expand All @@ -149,6 +148,8 @@ private[spark] class CoarseGrainedExecutorBackend(
}
}

def getUserClassPath: Seq[URL] = Nil

def extractLogUrls: Map[String, String] = {
val prefix = "SPARK_LOG_URL_"
sys.env.filterKeys(_.startsWith(prefix))
Expand All @@ -165,7 +166,7 @@ private[spark] class CoarseGrainedExecutorBackend(
case RegisteredExecutor =>
logInfo("Successfully registered with driver")
try {
executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false,
executor = new Executor(executorId, hostname, env, getUserClassPath, isLocal = false,
resources = _resources)
driver.get.send(LaunchedExecutor(executorId))
} catch {
Expand Down Expand Up @@ -385,15 +386,14 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
cores: Int,
appId: String,
workerUrl: Option[String],
userClassPath: mutable.ListBuffer[URL],
resourcesFileOpt: Option[String],
resourceProfileId: Int)

def main(args: Array[String]): Unit = {
val createFn: (RpcEnv, Arguments, SparkEnv, ResourceProfile) =>
CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env, resourceProfile) =>
new CoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId,
arguments.bindAddress, arguments.hostname, arguments.cores, arguments.userClassPath.toSeq,
arguments.bindAddress, arguments.hostname, arguments.cores,
env, arguments.resourcesFileOpt, resourceProfile)
}
run(parseArguments(args, this.getClass.getCanonicalName.stripSuffix("$")), createFn)
Expand Down Expand Up @@ -476,7 +476,6 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
var resourcesFileOpt: Option[String] = None
var appId: String = null
var workerUrl: Option[String] = None
val userClassPath = new mutable.ListBuffer[URL]()
var resourceProfileId: Int = DEFAULT_RESOURCE_PROFILE_ID

var argv = args.toList
Expand Down Expand Up @@ -507,9 +506,6 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
// Worker url is used in spark standalone mode to enforce fate-sharing with worker
workerUrl = Some(value)
argv = tail
case ("--user-class-path") :: value :: tail =>
userClassPath += new URL(value)
argv = tail
case ("--resourceProfileId") :: value :: tail =>
resourceProfileId = value.toInt
argv = tail
Expand All @@ -536,7 +532,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
}

Arguments(driverUrl, executorId, bindAddress, hostname, cores, appId, workerUrl,
userClassPath, resourcesFileOpt, resourceProfileId)
resourcesFileOpt, resourceProfileId)
}

private def printUsageAndExit(classNameForEntry: String): Unit = {
Expand All @@ -554,7 +550,6 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
| --resourcesFile <fileWithJSONResourceInformation>
| --app-id <appid>
| --worker-url <workerUrl>
| --user-class-path <url>
| --resourceProfileId <id>
|""".stripMargin)
// scalastyle:on println
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -878,6 +878,8 @@ private[spark] class Executor(
val urls = userClassPath.toArray ++ currentJars.keySet.map { uri =>
new File(uri.split("/").last).toURI.toURL
}
logInfo(s"Starting executor with user classpath (userClassPathFirst = $userClassPathFirst): " +
urls.mkString("'", ",", "'"))
if (userClassPathFirst) {
new ChildFirstURLClassLoader(urls, currentLoader)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.executor

import java.io.File
import java.net.URL
import java.nio.ByteBuffer
import java.util.Properties

Expand Down Expand Up @@ -56,7 +55,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite

// we don't really use this, just need it to get at the parser function
val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1", "host1",
4, Seq.empty[URL], env, None, resourceProfile)
4, env, None, resourceProfile)
withTempDir { tmpDir =>
val testResourceArgs: JObject = ("" -> "")
val ja = JArray(List(testResourceArgs))
Expand All @@ -77,7 +76,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
val env = createMockEnv(conf, serializer)
// we don't really use this, just need it to get at the parser function
val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1", "host1",
4, Seq.empty[URL], env, None, ResourceProfile.getOrCreateDefaultProfile(conf))
4, env, None, ResourceProfile.getOrCreateDefaultProfile(conf))
withTempDir { tmpDir =>
val ra = ResourceAllocation(EXECUTOR_GPU_ID, Seq("0", "1"))
val ja = Extraction.decompose(Seq(ra))
Expand Down Expand Up @@ -111,7 +110,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
val env = createMockEnv(conf, serializer)
// we don't really use this, just need it to get at the parser function
val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1", "host1",
4, Seq.empty[URL], env, None, resourceProfile)
4, env, None, resourceProfile)

withTempDir { tmpDir =>
val gpuArgs = ResourceAllocation(EXECUTOR_GPU_ID, Seq("0", "1"))
Expand All @@ -138,7 +137,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
val env = createMockEnv(conf, serializer)
// we don't really use this, just need it to get at the parser function
val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1",
4, Seq.empty[URL], env, None, ResourceProfile.getOrCreateDefaultProfile(conf))
4, env, None, ResourceProfile.getOrCreateDefaultProfile(conf))

// not enough gpu's on the executor
withTempDir { tmpDir =>
Expand Down Expand Up @@ -191,7 +190,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
val env = createMockEnv(conf, serializer)
// we don't really use this, just need it to get at the parser function
val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1",
4, Seq.empty[URL], env, None, resourceProfile)
4, env, None, resourceProfile)

// executor resources < required
withTempDir { tmpDir =>
Expand Down Expand Up @@ -222,7 +221,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite

// we don't really use this, just need it to get at the parser function
val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1",
4, Seq.empty[URL], env, None, ResourceProfile.getOrCreateDefaultProfile(conf))
4, env, None, ResourceProfile.getOrCreateDefaultProfile(conf))

val parsedResources = backend.parseOrFindResources(None)

Expand Down Expand Up @@ -269,7 +268,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite

// we don't really use this, just need it to get at the parser function
val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1",
4, Seq.empty[URL], env, None, resourceProfile)
4, env, None, resourceProfile)
val gpuArgs = ResourceAllocation(EXECUTOR_GPU_ID, Seq("0", "1"))
val ja = Extraction.decompose(Seq(gpuArgs))
val f1 = createTempJsonFile(dir, "resources", ja)
Expand All @@ -294,7 +293,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
val rpcEnv = RpcEnv.create("1", "localhost", 0, conf, securityMgr)
val env = createMockEnv(conf, serializer, Some(rpcEnv))
backend = new CoarseGrainedExecutorBackend(env.rpcEnv, rpcEnv.address.hostPort, "1",
"host1", "host1", 4, Seq.empty[URL], env, None,
"host1", "host1", 4, env, None,
resourceProfile = ResourceProfile.getOrCreateDefaultProfile(conf))
assert(backend.taskResources.isEmpty)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

package org.apache.spark.deploy.yarn

import java.io.{File, IOException}
import java.io.IOException
import java.lang.reflect.{InvocationTargetException, Modifier}
import java.net.{URI, URL, URLEncoder}
import java.net.{URI, URLEncoder}
import java.security.PrivilegedExceptionAction
import java.util.concurrent.{TimeoutException, TimeUnit}

Expand Down Expand Up @@ -85,10 +85,7 @@ private[spark] class ApplicationMaster(
private var metricsSystem: Option[MetricsSystem] = None

private val userClassLoader = {
val classpath = Client.getUserClasspath(sparkConf)
val urls = classpath.map { entry =>
new URL("file:" + new File(entry.getPath()).getAbsolutePath())
}
val urls = Client.getUserClasspathUrls(sparkConf, isClusterMode)

if (isClusterMode) {
if (Client.isUserClassPathFirst(sparkConf, isDriver = true)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
package org.apache.spark.deploy.yarn

import java.io.{FileSystem => _, _}
import java.net.{InetAddress, UnknownHostException, URI}
import java.net.{InetAddress, UnknownHostException, URI, URL}
import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
import java.nio.file.Files
import java.nio.file.{Files, Paths}
import java.util.{Locale, Properties, UUID}
import java.util.zip.{ZipEntry, ZipOutputStream}

Expand Down Expand Up @@ -1308,7 +1308,7 @@ private[spark] class Client(

}

private object Client extends Logging {
private[spark] object Client extends Logging {

// Alias for the user jar
val APP_JAR_NAME: String = "__app__.jar"
Expand Down Expand Up @@ -1470,6 +1470,32 @@ private object Client extends Logging {
(mainUri ++ secondaryUris).toArray
}

/**
* Returns a list of local, absolute file URLs representing the user classpath. Note that this
* must be executed on the same host which will access the URLs, as it will resolve relative
* paths based on the current working directory.
*
* @param conf Spark configuration.
* @param useClusterPath Whether to use the 'cluster' path when resolving paths with the
* `local` scheme. This should be used when running on the cluster, but
* not when running on the gateway (i.e. for the driver in `client` mode).
* @return Array of local URLs ready to be passed to a [[java.net.URLClassLoader]].
*/
def getUserClasspathUrls(conf: SparkConf, useClusterPath: Boolean): Array[URL] = {
Client.getUserClasspath(conf).map { uri =>
val inputPath = uri.getPath
val replacedFilePath = if (Utils.isLocalUri(uri.toString) && useClusterPath) {
Client.getClusterPath(conf, inputPath)
} else {
// Any other URI schemes should have been resolved by this point
assert(uri.getScheme == null || uri.getScheme == "file" || Utils.isLocalUri(uri.toString),
"getUserClasspath should only return 'file' or 'local' URIs but found: " + uri)
inputPath
}
Paths.get(replacedFilePath).toAbsolutePath.toUri.toURL
}
}

private def getMainJarUri(mainJar: Option[String]): Option[URI] = {
mainJar.flatMap { path =>
val uri = Utils.resolveURI(path)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.deploy.yarn

import java.io.File
import java.nio.ByteBuffer
import java.util.Collections

Expand Down Expand Up @@ -190,16 +189,6 @@ private[yarn] class ExecutorRunnable(
// For log4j configuration to reference
javaOpts += ("-Dspark.yarn.app.container.log.dir=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR)

val userClassPath = Client.getUserClasspath(sparkConf).flatMap { uri =>
val absPath =
if (new File(uri.getPath()).isAbsolute()) {
Client.getClusterPath(sparkConf, uri.getPath())
} else {
Client.buildPath(Environment.PWD.$(), uri.getPath())
}
Seq("--user-class-path", "file:" + absPath)
}.toSeq

YarnSparkHadoopUtil.addOutOfMemoryErrorArgument(javaOpts)
val commands = prefixEnv ++
Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++
Expand All @@ -211,7 +200,6 @@ private[yarn] class ExecutorRunnable(
"--cores", executorCores.toString,
"--app-id", appId,
"--resourceProfileId", resourceProfileId.toString) ++
userClassPath ++
Seq(
s"1>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout",
s"2>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.net.URL

import org.apache.spark.SparkEnv
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.yarn.Client
import org.apache.spark.internal.Logging
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.rpc.RpcEnv
Expand All @@ -38,7 +39,6 @@ private[spark] class YarnCoarseGrainedExecutorBackend(
bindAddress: String,
hostname: String,
cores: Int,
userClassPath: Seq[URL],
env: SparkEnv,
resourcesFile: Option[String],
resourceProfile: ResourceProfile)
Expand All @@ -49,13 +49,15 @@ private[spark] class YarnCoarseGrainedExecutorBackend(
bindAddress,
hostname,
cores,
userClassPath,
env,
resourcesFile,
resourceProfile) with Logging {

private lazy val hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(env.conf)

override def getUserClassPath: Seq[URL] =
Client.getUserClasspathUrls(env.conf, useClusterPath = true)

override def extractLogUrls: Map[String, String] = {
YarnContainerInfoHelper.getLogUrls(hadoopConfiguration, container = None)
.getOrElse(Map())
Expand All @@ -73,7 +75,7 @@ private[spark] object YarnCoarseGrainedExecutorBackend extends Logging {
val createFn: (RpcEnv, CoarseGrainedExecutorBackend.Arguments, SparkEnv, ResourceProfile) =>
CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env, resourceProfile) =>
new YarnCoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId,
arguments.bindAddress, arguments.hostname, arguments.cores, arguments.userClassPath.toSeq,
arguments.bindAddress, arguments.hostname, arguments.cores,
env, arguments.resourcesFileOpt, resourceProfile)
}
val backendArgs = CoarseGrainedExecutorBackend.parseArguments(args,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.deploy.yarn

import java.io.{File, FileInputStream, FileNotFoundException, FileOutputStream}
import java.net.URI
import java.nio.file.Paths
import java.util.Properties
import java.util.concurrent.ConcurrentHashMap

Expand Down Expand Up @@ -583,6 +584,40 @@ class ClientSuite extends SparkFunSuite with Matchers {
}
}

test("SPARK-35672: test Client.getUserClasspathUrls") {
val gatewayRootPath = "/local/matching/replace"
val replacementRootPath = "/replaced/path"
val conf = new SparkConf()
.set(SECONDARY_JARS, Seq(
s"local:$gatewayRootPath/foo.jar",
"local:/local/not/matching/replace/foo.jar",
"file:/absolute/file/path/foo.jar",
s"$gatewayRootPath/but-not-actually-local/foo.jar",
"/absolute/path/foo.jar",
"relative/path/foo.jar"
))
.set(GATEWAY_ROOT_PATH, gatewayRootPath)
.set(REPLACEMENT_ROOT_PATH, replacementRootPath)

def assertUserClasspathUrls(cluster: Boolean, expectedReplacementPath: String): Unit = {
val expectedUrls = Seq(
Paths.get(APP_JAR_NAME).toAbsolutePath.toUri.toString,
s"file:$expectedReplacementPath/foo.jar",
"file:/local/not/matching/replace/foo.jar",
"file:/absolute/file/path/foo.jar",
// since this path wasn't a local URI, it should never be replaced
s"file:$gatewayRootPath/but-not-actually-local/foo.jar",
"file:/absolute/path/foo.jar",
Paths.get("relative/path/foo.jar").toAbsolutePath.toUri.toString
).map(URI.create(_).toURL).toArray
assert(Client.getUserClasspathUrls(conf, cluster) === expectedUrls)
}
// assert that no replacement happens when cluster = false by expecting the replacement
// path to be the same as the original path
assertUserClasspathUrls(cluster = false, gatewayRootPath)
assertUserClasspathUrls(cluster = true, replacementRootPath)
}

private val matching = Seq(
("files URI match test1", "file:///file1", "file:///file2"),
("files URI match test2", "file:///c:file1", "file://c:file2"),
Expand Down
Loading