Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -196,14 +196,15 @@ object SparkEnv extends Logging {
private[spark] def createExecutorEnv(
conf: SparkConf,
executorId: String,
bindAddress: String,
hostname: String,
numCores: Int,
ioEncryptionKey: Option[Array[Byte]],
isLocal: Boolean): SparkEnv = {
val env = create(
conf,
executorId,
hostname,
bindAddress,
hostname,
None,
isLocal,
Expand All @@ -214,6 +215,17 @@ object SparkEnv extends Logging {
env
}

private[spark] def createExecutorEnv(
conf: SparkConf,
executorId: String,
hostname: String,
numCores: Int,
ioEncryptionKey: Option[Array[Byte]],
isLocal: Boolean): SparkEnv = {
createExecutorEnv(conf, executorId, hostname,
hostname, numCores, ioEncryptionKey, isLocal)
}

/**
* Helper method to create a SparkEnv for a driver or an executor.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ private[spark] class CoarseGrainedExecutorBackend(
override val rpcEnv: RpcEnv,
driverUrl: String,
executorId: String,
bindAddress: String,
hostname: String,
cores: Int,
userClassPath: Seq[URL],
Expand Down Expand Up @@ -227,6 +228,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
case class Arguments(
driverUrl: String,
executorId: String,
bindAddress: String,
hostname: String,
cores: Int,
appId: String,
Expand All @@ -238,7 +240,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
val createFn: (RpcEnv, Arguments, SparkEnv) =>
CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env) =>
new CoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId,
arguments.hostname, arguments.cores, arguments.userClassPath, env,
arguments.bindAddress, arguments.hostname, arguments.cores, arguments.userClassPath, env,
arguments.resourcesFileOpt)
}
run(parseArguments(args, this.getClass.getCanonicalName.stripSuffix("$")), createFn)
Expand All @@ -259,10 +261,12 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
val executorConf = new SparkConf
val fetcher = RpcEnv.create(
"driverPropsFetcher",
arguments.bindAddress,
arguments.hostname,
-1,
executorConf,
new SecurityManager(executorConf),
numUsableCores = 0,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this line in this PR?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is needed unless we add extra method in RpcEnv object.

clientMode = true)

var driver: RpcEndpointRef = null
Expand Down Expand Up @@ -297,8 +301,8 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
}

driverConf.set(EXECUTOR_ID, arguments.executorId)
val env = SparkEnv.createExecutorEnv(driverConf, arguments.executorId, arguments.hostname,
arguments.cores, cfg.ioEncryptionKey, isLocal = false)
val env = SparkEnv.createExecutorEnv(driverConf, arguments.executorId, arguments.bindAddress,
arguments.hostname, arguments.cores, cfg.ioEncryptionKey, isLocal = false)

env.rpcEnv.setupEndpoint("Executor", backendCreateFn(env.rpcEnv, arguments, env))
arguments.workerUrl.foreach { url =>
Expand All @@ -311,6 +315,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
def parseArguments(args: Array[String], classNameForEntry: String): Arguments = {
var driverUrl: String = null
var executorId: String = null
var bindAddress: String = null
var hostname: String = null
var cores: Int = 0
var resourcesFileOpt: Option[String] = None
Expand All @@ -327,6 +332,9 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
case ("--executor-id") :: value :: tail =>
executorId = value
argv = tail
case ("--bind-address") :: value :: tail =>
bindAddress = value
argv = tail
case ("--hostname") :: value :: tail =>
hostname = value
argv = tail
Expand Down Expand Up @@ -364,7 +372,11 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
printUsageAndExit(classNameForEntry)
}

Arguments(driverUrl, executorId, hostname, cores, appId, workerUrl,
if (bindAddress == null) {
bindAddress = hostname
Copy link
Member

@dongjoon-hyun dongjoon-hyun Nov 1, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need a new test case for this code path, @nishchalv .

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun Added couple of test cases.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you!

}

Arguments(driverUrl, executorId, bindAddress, hostname, cores, appId, workerUrl,
userClassPath, resourcesFileOpt)
}

Expand All @@ -377,6 +389,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
| Options are:
| --driver-url <driverUrl>
| --executor-id <executorId>
| --bind-address <bindAddress>
| --hostname <hostname>
| --cores <cores>
| --resourcesFile <fileWithJSONResourceInformation>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
val env = createMockEnv(conf, serializer)

// we don't really use this, just need it to get at the parser function
val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1",
val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1", "host1",
4, Seq.empty[URL], env, None)
withTempDir { tmpDir =>
val testResourceArgs: JObject = ("" -> "")
Expand All @@ -76,7 +76,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
val serializer = new JavaSerializer(conf)
val env = createMockEnv(conf, serializer)
// we don't really use this, just need it to get at the parser function
val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1",
val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1", "host1",
4, Seq.empty[URL], env, None)
withTempDir { tmpDir =>
val ra = ResourceAllocation(EXECUTOR_GPU_ID, Seq("0", "1"))
Expand All @@ -101,7 +101,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
val serializer = new JavaSerializer(conf)
val env = createMockEnv(conf, serializer)
// we don't really use this, just need it to get at the parser function
val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1",
val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1", "host1",
4, Seq.empty[URL], env, None)

withTempDir { tmpDir =>
Expand Down Expand Up @@ -129,7 +129,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
val serializer = new JavaSerializer(conf)
val env = createMockEnv(conf, serializer)
// we don't really use this, just need it to get at the parser function
val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1",
val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1",
4, Seq.empty[URL], env, None)

// not enough gpu's on the executor
Expand Down Expand Up @@ -168,7 +168,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
val serializer = new JavaSerializer(conf)
val env = createMockEnv(conf, serializer)
// we don't really use this, just need it to get at the parser function
val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1",
val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1",
4, Seq.empty[URL], env, None)

// executor resources < required
Expand Down Expand Up @@ -200,7 +200,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
val env = createMockEnv(conf, serializer)

// we don't really use this, just need it to get at the parser function
val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1",
val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1",
4, Seq.empty[URL], env, None)

val parsedResources = backend.parseOrFindResources(None)
Expand All @@ -226,7 +226,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
val env = createMockEnv(conf, serializer)

// we don't really use this, just need it to get at the parser function
val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1",
val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1",
4, Seq.empty[URL], env, None)
val gpuArgs = ResourceAllocation(EXECUTOR_GPU_ID, Seq("0", "1"))
val ja = Extraction.decompose(Seq(gpuArgs))
Expand Down Expand Up @@ -254,7 +254,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
val rpcEnv = RpcEnv.create("1", "localhost", 0, conf, securityMgr)
val env = createMockEnv(conf, serializer, Some(rpcEnv))
backend = new CoarseGrainedExecutorBackend(env.rpcEnv, rpcEnv.address.hostPort, "1",
"host1", 4, Seq.empty[URL], env, None)
"host1", "host1", 4, Seq.empty[URL], env, None)
assert(backend.taskResources.isEmpty)

val taskId = 1000000
Expand Down Expand Up @@ -289,6 +289,31 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
}
}

test("SPARK-24203 when bindAddress is not set, it defaults to hostname") {
val args1 = Array(
"--driver-url", "driverurl",
"--executor-id", "1",
"--hostname", "host1",
"--cores", "1",
"--app-id", "app1")

val arg = CoarseGrainedExecutorBackend.parseArguments(args1, "")
assert(arg.bindAddress == "host1")
}

test("SPARK-24203 when bindAddress is different, it does not default to hostname") {
val args1 = Array(
"--driver-url", "driverurl",
"--executor-id", "1",
"--hostname", "host1",
"--bind-address", "bindaddress1",
"--cores", "1",
"--app-id", "app1")

val arg = CoarseGrainedExecutorBackend.parseArguments(args1, "")
assert(arg.bindAddress == "bindaddress1")
}

private def createMockEnv(conf: SparkConf, serializer: JavaSerializer,
rpcEnv: Option[RpcEnv] = None): SparkEnv = {
val mockEnv = mock[SparkEnv]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ private[spark] class YarnCoarseGrainedExecutorBackend(
rpcEnv: RpcEnv,
driverUrl: String,
executorId: String,
bindAddress: String,
hostname: String,
cores: Int,
userClassPath: Seq[URL],
Expand All @@ -43,6 +44,7 @@ private[spark] class YarnCoarseGrainedExecutorBackend(
rpcEnv,
driverUrl,
executorId,
bindAddress,
hostname,
cores,
userClassPath,
Expand All @@ -68,7 +70,7 @@ private[spark] object YarnCoarseGrainedExecutorBackend extends Logging {
val createFn: (RpcEnv, CoarseGrainedExecutorBackend.Arguments, SparkEnv) =>
CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env) =>
new YarnCoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId,
arguments.hostname, arguments.cores, arguments.userClassPath, env,
arguments.bindAddress, arguments.hostname, arguments.cores, arguments.userClassPath, env,
arguments.resourcesFileOpt)
}
val backendArgs = CoarseGrainedExecutorBackend.parseArguments(args,
Expand Down