Skip to content

Commit bc083e3

Browse files
Overload RegisteredExecutor to send tokens. Minor doc updates.
1 parent 7b19643 commit bc083e3

File tree

6 files changed

+23
-21
lines changed

6 files changed

+23
-21
lines changed

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ private[spark] class CoarseGrainedExecutorBackend(
5555
logInfo("Connecting to driver: " + driverUrl)
5656
rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
5757
driver = Some(ref)
58-
ref.sendWithReply[RegisteredExecutor.type](
58+
ref.sendWithReply[RegisteredExecutor](
5959
RegisterExecutor(executorId, self, hostPort, cores, extractLogUrls))
6060
} onComplete {
6161
case Success(msg) => Utils.tryLogNonFatalError {
@@ -72,17 +72,17 @@ private[spark] class CoarseGrainedExecutorBackend(
7272
}
7373

7474
override def receive: PartialFunction[Any, Unit] = {
75-
case RegisteredExecutor =>
75+
case RegisteredExecutor(tokens) =>
7676
logInfo("Successfully registered with driver")
7777
val (hostname, _) = Utils.parseHostPort(hostPort)
7878
executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
79-
80-
case NewTokens(tokens) =>
81-
val inStream = new DataInputStream(new ByteArrayInputStream(tokens.value.array()))
82-
val creds = new Credentials()
83-
creds.readFields(inStream)
84-
inStream.close()
85-
UserGroupInformation.getCurrentUser.addCredentials(creds)
79+
tokens.foreach { tokenBuffer =>
80+
val inStream = new DataInputStream(new ByteArrayInputStream(tokenBuffer.value.array()))
81+
val creds = new Credentials()
82+
creds.readFields(inStream)
83+
inStream.close()
84+
UserGroupInformation.getCurrentUser.addCredentials(creds)
85+
}
8686

8787
case RegisterExecutorFailed(message) =>
8888
logError("Slave registration failed: " + message)

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ private[spark] object CoarseGrainedClusterMessages {
3535
case class KillTask(taskId: Long, executor: String, interruptThread: Boolean)
3636
extends CoarseGrainedClusterMessage
3737

38-
case object RegisteredExecutor extends CoarseGrainedClusterMessage
38+
case class RegisteredExecutor(tokens: Option[SerializableBuffer])
39+
extends CoarseGrainedClusterMessage
3940

4041
case class NewTokens(tokens: SerializableBuffer) extends CoarseGrainedClusterMessage
4142

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,8 +125,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
125125
context.reply(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
126126
} else {
127127
logInfo("Registered executor: " + executorRef + " with ID " + executorId)
128-
context.reply(RegisteredExecutor)
129-
latestTokens.foreach(x => context.reply(NewTokens(x)))
128+
context.reply(RegisteredExecutor(latestTokens))
130129
addressToExecutorId(executorRef.address) = executorId
131130
totalCoreCount.addAndGet(cores)
132131
totalRegisteredExecutors.addAndGet(1)

docs/security.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ SSL must be configured on each node and configured for each component involved i
3131

3232
### YARN mode
3333
The key-store can be prepared on the client side and then distributed and used by the executors as the part of the application. It is possible because the user is able to deploy files before the application is started in YARN by using `spark.yarn.dist.files` or `spark.yarn.dist.archives` configuration settings. The responsibility for encryption of transferring these files is on YARN side and has nothing to do with Spark.
34-
For long-running apps like Spark Streaming apps be able to write to HDFS, it is possible to pass a principal and keytab to `spark-submit` via the `--principal` and `--keytab` parameters respectively. The keytab passed in will be copied over to the machine running the Application Master securely via the Hadoop Distributed Cache. The Kerberos login will be periodically renewed using this principal and keytab and the delegation tokens required for HDFS will be generated periodically so the application can continue writing to HDFS.
34+
35+
For long-running apps like Spark Streaming apps to be able to write to HDFS, it is possible to pass a principal and keytab to `spark-submit` via the `--principal` and `--keytab` parameters respectively. The keytab passed in will be copied over to the machine running the Application Master via the Hadoop Distributed Cache (securely - if YARN is configured with SSL and HDFS encryption is enabled). The Kerberos login will be periodically renewed using this principal and keytab and the delegation tokens required for HDFS will be generated periodically so the application can continue writing to HDFS.
3536

3637
### Standalone mode
3738
The user needs to provide key-stores and configuration options for master and workers. They have to be set by attaching appropriate Java system properties in `SPARK_MASTER_OPTS` and in `SPARK_WORKER_OPTS` environment variables, or just in `SPARK_DAEMON_JAVA_OPTS`. In this mode, the user may allow the executors to use the SSL settings inherited from the worker which spawned that executor. It can be accomplished by setting `spark.ssl.useNodeLocalConf` to `true`. If that parameter is set, the settings provided by user on the client side, are not used by the executors.

yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -78,13 +78,14 @@ private[yarn] class AMDelegationTokenRenewer(
7878
val credentials = UserGroupInformation.getCurrentUser.getCredentials
7979
val interval = (0.75 * (hadoopUtil.getLatestTokenValidity(credentials) -
8080
System.currentTimeMillis())).toLong
81-
// If only 6 hours left, then force a renewal immediately. This is to avoid tokens with
82-
// very less validity being used on AM restart.
83-
if ((interval millis).toHours <= 6) {
84-
0L
85-
} else {
86-
interval
87-
}
81+
// // If only 6 hours left, then force a renewal immediately. This is to avoid tokens with
82+
// // very less validity being used on AM restart.
83+
// if ((interval millis).toHours <= 6) {
84+
// 0L
85+
// } else {
86+
// interval
87+
// }
88+
interval
8889
}
8990

9091
def scheduleRenewal(runnable: Runnable): Unit = {

yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import org.scalatest.{FunSuite, Matchers}
2929

3030
import org.apache.hadoop.yarn.api.records.ApplicationAccessType
3131

32-
import org.apache.spark.{SparkException, Logging, SecurityManager, SparkConf}
32+
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
3333
import org.apache.spark.util.Utils
3434

3535

0 commit comments

Comments
 (0)